You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/22 01:08:11 UTC

[flink] branch master updated: [FLINK-12524] [table-planner-blink] Introduce CalcRankTransposeRule & RankNumberColumnRemoveRule

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e6f968c  [FLINK-12524] [table-planner-blink] Introduce CalcRankTransposeRule & RankNumberColumnRemoveRule
e6f968c is described below

commit e6f968c504ac323b8ae5da8a9545aae3ac69197c
Author: godfrey he <go...@163.com>
AuthorDate: Wed May 22 09:07:48 2019 +0800

    [FLINK-12524] [table-planner-blink] Introduce CalcRankTransposeRule & RankNumberColumnRemoveRule
    
    This closes #8460
---
 .../plan/metadata/FlinkRelMdColumnInterval.scala   |   4 +-
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |   4 +-
 .../plan/metadata/FlinkRelMdDistinctRowCount.scala |   4 +-
 .../plan/metadata/FlinkRelMdPopulationSize.scala   |   4 +-
 .../plan/metadata/FlinkRelMdUniqueGroups.scala     |   4 +-
 .../table/plan/metadata/FlinkRelMdUniqueKeys.scala |   4 +-
 .../table/plan/rules/FlinkBatchRuleSets.scala      |   4 +
 .../table/plan/rules/FlinkStreamRuleSets.scala     |   7 +-
 .../plan/rules/logical/CalcRankTransposeRule.scala | 197 +++++++++++++++
 .../rules/logical/RankNumberColumnRemoveRule.scala |  84 +++++++
 .../flink/table/plan/util/CorrelateUtil.scala      |   4 +-
 .../flink/table/plan/util/FlinkRelMdUtil.scala     |  12 +-
 .../flink/table/plan/util/FlinkRexUtil.scala       |  35 ++-
 .../apache/flink/table/plan/util/RankUtil.scala    |  10 +
 .../rules/logical/CalcRankTransposeRuleTest.xml    | 277 +++++++++++++++++++++
 .../FlinkLogicalRankRuleForRangeEndTest.xml        |  18 +-
 .../logical/RankNumberColumnRemoveRuleTest.xml     | 127 ++++++++++
 .../flink/table/plan/stream/sql/RankTest.xml       |  59 +++--
 .../rules/logical/CalcRankTransposeRuleTest.scala  | 190 ++++++++++++++
 .../logical/RankNumberColumnRemoveRuleTest.scala   |  96 +++++++
 20 files changed, 1077 insertions(+), 67 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
index d75558c..cc55906 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.plan.nodes.physical.batch._
 import org.apache.flink.table.plan.nodes.physical.stream._
 import org.apache.flink.table.plan.schema.FlinkRelOptTable
 import org.apache.flink.table.plan.stats._
-import org.apache.flink.table.plan.util.{AggregateUtil, ColumnIntervalUtil, FlinkRelMdUtil, FlinkRelOptUtil}
+import org.apache.flink.table.plan.util.{AggregateUtil, ColumnIntervalUtil, FlinkRelOptUtil, RankUtil}
 import org.apache.flink.table.runtime.rank.{ConstantRankRange, VariableRankRange}
 import org.apache.flink.util.Preconditions
 
@@ -321,7 +321,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
       mq: RelMetadataQuery,
       index: Int): ValueInterval = {
     val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
-    val rankFunColumnIndex = FlinkRelMdUtil.getRankFunctionColumnIndex(rank).getOrElse(-1)
+    val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
     if (index == rankFunColumnIndex) {
       rank.rankRange match {
         case r: ConstantRankRange => ValueInterval(r.getRankStart, r.getRankEnd)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
index 25754a9..1279a80 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.plan.nodes.logical._
 import org.apache.flink.table.plan.nodes.physical.batch._
 import org.apache.flink.table.plan.nodes.physical.stream._
 import org.apache.flink.table.plan.schema.FlinkRelOptTable
-import org.apache.flink.table.plan.util.FlinkRelMdUtil
+import org.apache.flink.table.plan.util.{FlinkRelMdUtil, RankUtil}
 import org.apache.flink.table.runtime.rank.RankType
 import org.apache.flink.table.sources.TableSource
 
@@ -267,7 +267,7 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
       columns: ImmutableBitSet,
       ignoreNulls: Boolean): JBoolean = {
     val input = rank.getInput
-    val rankFunColumnIndex = FlinkRelMdUtil.getRankFunctionColumnIndex(rank).getOrElse(-1)
+    val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
     if (rankFunColumnIndex < 0) {
       mq.areColumnsUnique(input, columns, ignoreNulls)
     } else {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCount.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCount.scala
index 049bdaa..5b33318 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCount.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCount.scala
@@ -22,7 +22,7 @@ import org.apache.flink.table.api.{PlannerConfigOptions, TableException}
 import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank}
 import org.apache.flink.table.plan.nodes.physical.batch._
 import org.apache.flink.table.plan.schema.FlinkRelOptTable
-import org.apache.flink.table.plan.util.{FlinkRelMdUtil, FlinkRelOptUtil, FlinkRexUtil}
+import org.apache.flink.table.plan.util.{FlinkRelMdUtil, FlinkRelOptUtil, FlinkRexUtil, RankUtil}
 import org.apache.flink.table.{JArrayList, JDouble}
 
 import org.apache.calcite.plan.RelOptUtil
@@ -254,7 +254,7 @@ class FlinkRelMdDistinctRowCount private extends MetadataHandler[BuiltInMetadata
       mq: RelMetadataQuery,
       groupKey: ImmutableBitSet,
       predicate: RexNode): JDouble = {
-    val rankFunColumnIndex = FlinkRelMdUtil.getRankFunctionColumnIndex(rank).getOrElse(-1)
+    val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
     val newGroupKey = groupKey.clearIf(rankFunColumnIndex, rankFunColumnIndex > 0)
     val (nonRankPred, rankPred) = FlinkRelMdUtil.splitPredicateOnRank(rank, predicate)
     val inputNdv: JDouble = if (newGroupKey.nonEmpty) {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSize.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSize.scala
index d927a6d..389d36a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSize.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSize.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.metadata
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank}
 import org.apache.flink.table.plan.nodes.physical.batch._
-import org.apache.flink.table.plan.util.FlinkRelMdUtil
+import org.apache.flink.table.plan.util.{FlinkRelMdUtil, RankUtil}
 import org.apache.flink.table.{JArrayList, JDouble}
 
 import org.apache.calcite.plan.volcano.RelSubset
@@ -179,7 +179,7 @@ class FlinkRelMdPopulationSize private extends MetadataHandler[BuiltInMetadata.P
       rel: Rank,
       mq: RelMetadataQuery,
       groupKey: ImmutableBitSet): JDouble = {
-    val rankFunColumnIndex = FlinkRelMdUtil.getRankFunctionColumnIndex(rel).getOrElse(-1)
+    val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rel).getOrElse(-1)
     if (rankFunColumnIndex < 0 || !groupKey.toArray.contains(rankFunColumnIndex)) {
       mq.getPopulationSize(rel.getInput, groupKey)
     } else {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
index fe157e8..b559fe0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.metadata
 import org.apache.flink.table.plan.metadata.FlinkMetadata.UniqueGroups
 import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank}
 import org.apache.flink.table.plan.nodes.physical.batch._
-import org.apache.flink.table.plan.util.FlinkRelMdUtil
+import org.apache.flink.table.plan.util.{FlinkRelMdUtil, RankUtil}
 
 import org.apache.calcite.plan.volcano.RelSubset
 import org.apache.calcite.rel.RelNode
@@ -179,7 +179,7 @@ class FlinkRelMdUniqueGroups private extends MetadataHandler[UniqueGroups] {
       columns: ImmutableBitSet): ImmutableBitSet = {
     val columnList = columns.toList
     val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
-    val rankFunColumnIndex = FlinkRelMdUtil.getRankFunctionColumnIndex(rank).getOrElse(-1)
+    val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
     val columnSkipRankCol = columnList.filter(_ != rankFunColumnIndex)
     if (columnSkipRankCol.isEmpty) {
       return columns
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
index 2c87feb..f1c9060 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.plan.nodes.logical._
 import org.apache.flink.table.plan.nodes.physical.batch._
 import org.apache.flink.table.plan.nodes.physical.stream._
 import org.apache.flink.table.plan.schema.FlinkRelOptTable
-import org.apache.flink.table.plan.util.FlinkRelMdUtil
+import org.apache.flink.table.plan.util.{FlinkRelMdUtil, RankUtil}
 import org.apache.flink.table.runtime.rank.RankType
 import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.{JArrayList, JBoolean, JHashMap, JHashSet, JList, JSet}
@@ -227,7 +227,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
       mq: RelMetadataQuery,
       ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
     val inputUniqueKeys = mq.getUniqueKeys(rel.getInput, ignoreNulls)
-    val rankFunColumnIndex = FlinkRelMdUtil.getRankFunctionColumnIndex(rel).getOrElse(-1)
+    val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rel).getOrElse(-1)
     if (rankFunColumnIndex < 0) {
       inputUniqueKeys
     } else {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index 103b757..fa48ecf 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -229,6 +229,10 @@ object FlinkBatchRuleSets {
 
     // rank rules
     FlinkLogicalRankRule.CONSTANT_RANGE_INSTANCE,
+    // transpose calc past rank to reduce rank input fields
+    CalcRankTransposeRule.INSTANCE,
+    // remove output of rank number when it is a constant
+    RankNumberColumnRemoveRule.INSTANCE,
 
     // calc rules
     FilterCalcMergeRule.INSTANCE,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index 9359ae1..1aacac8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -19,9 +19,10 @@
 package org.apache.flink.table.plan.rules
 
 import org.apache.flink.table.plan.nodes.logical._
-import org.apache.flink.table.plan.rules.logical.{CalcSnapshotTransposeRule, _}
+import org.apache.flink.table.plan.rules.logical._
 import org.apache.flink.table.plan.rules.physical.FlinkExpandConversionRule
 import org.apache.flink.table.plan.rules.physical.stream._
+
 import org.apache.calcite.rel.core.RelFactories
 import org.apache.calcite.rel.logical.{LogicalIntersect, LogicalMinus, LogicalUnion}
 import org.apache.calcite.rel.rules._
@@ -271,6 +272,10 @@ object FlinkStreamRuleSets {
   val LOGICAL_REWRITE: RuleSet = RuleSets.ofList(
     // transform over window to topn node
     FlinkLogicalRankRule.INSTANCE,
+    // transpose calc past rank to reduce rank input fields
+    CalcRankTransposeRule.INSTANCE,
+    // remove output of rank number when it is a constant
+    RankNumberColumnRemoveRule.INSTANCE,
     // split distinct aggregate to reduce data skew
     SplitAggregateRule.INSTANCE,
     // transpose calc past snapshot
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRule.scala
new file mode 100644
index 0000000..39f7b89
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRule.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.logical._
+import org.apache.flink.table.plan.util.{FlinkRexUtil, RankUtil}
+import org.apache.flink.table.runtime.rank.VariableRankRange
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.RelCollations
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.{RexBuilder, RexInputRef, RexProgram}
+import org.apache.calcite.util.ImmutableBitSet
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that transposes [[FlinkLogicalCalc]] past [[FlinkLogicalRank]]
+  * to reduce rank input fields.
+  */
+class CalcRankTransposeRule
+  extends RelOptRule(
+    operand(classOf[FlinkLogicalCalc],
+      operand(classOf[FlinkLogicalRank], any())),
+    "CalcRankTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val calc: FlinkLogicalCalc = call.rel(0)
+    val rank: FlinkLogicalRank = call.rel(1)
+
+    val totalColumnCount = rank.getInput.getRowType.getFieldCount
+    // apply the rule only when calc could prune some columns
+    val pushableColumns = getPushableColumns(calc, rank)
+    pushableColumns.length < totalColumnCount
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val calc: FlinkLogicalCalc = call.rel(0)
+    val rank: FlinkLogicalRank = call.rel(1)
+
+    val pushableColumns = getPushableColumns(calc, rank)
+
+    val rexBuilder = calc.getCluster.getRexBuilder
+    // create a new Calc to project columns of Rank's input
+    val innerProgram = createNewInnerCalcProgram(
+      pushableColumns,
+      rank.getInput.getRowType,
+      rexBuilder)
+    val newInnerCalc = calc.copy(calc.getTraitSet, rank.getInput, innerProgram)
+
+    // create a new Rank on top of new Calc
+    var fieldMapping = pushableColumns.zipWithIndex.toMap
+    val newRank = createNewRankOnCalc(fieldMapping, newInnerCalc, rank)
+
+    // create a new Calc on top of newRank if needed
+    if (rank.outputRankNumber) {
+      // append RankNumber field mapping
+      val oldRankFunFieldIdx = RankUtil.getRankNumberColumnIndex(rank)
+        .getOrElse(throw new TableException("This should not happen"))
+      val newRankFunFieldIdx = RankUtil.getRankNumberColumnIndex(newRank)
+        .getOrElse(throw new TableException("This should not happen"))
+      fieldMapping += (oldRankFunFieldIdx -> newRankFunFieldIdx)
+    }
+    val topProgram = createNewTopCalcProgram(
+      calc.getProgram,
+      fieldMapping,
+      newRank.getRowType,
+      rexBuilder)
+
+    val equiv = if (topProgram.isTrivial) {
+      // Ignore newTopCac if it's program is trivial
+      newRank
+    } else {
+      calc.copy(calc.getTraitSet, newRank, topProgram)
+    }
+    call.transformTo(equiv)
+  }
+
+  private def getPushableColumns(calc: Calc, rank: FlinkLogicalRank): Array[Int] = {
+    val usedFields = getUsedFields(calc.getProgram)
+    val rankFunFieldIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
+    val usedFieldsExcludeRankNumber = usedFields.filter(_ != rankFunFieldIndex)
+
+    val requiredFields = getKeyFields(rank)
+    usedFieldsExcludeRankNumber.union(requiredFields).distinct.sorted
+  }
+
+  private def getUsedFields(program: RexProgram): Array[Int] = {
+    val projects = program.getProjectList.map(program.expandLocalRef)
+    val condition = if (program.getCondition != null) {
+      program.expandLocalRef(program.getCondition)
+    } else {
+      null
+    }
+    RelOptUtil.InputFinder.bits(projects, condition).toArray
+  }
+
+  private def getKeyFields(rank: FlinkLogicalRank): Array[Int] = {
+    val partitionKey = rank.partitionKey.toArray
+    val orderKey = rank.orderKey.getFieldCollations.map(_.getFieldIndex).toArray
+    val uniqueKeys = rank.getCluster.getMetadataQuery.getUniqueKeys(rank.getInput)
+    val keysInUniqueKeys = if (uniqueKeys == null || uniqueKeys.isEmpty) {
+      Array[Int]()
+    } else {
+      uniqueKeys.flatMap(_.toArray).toArray
+    }
+    val rankRangeKey = rank.rankRange match {
+      case v: VariableRankRange => Array(v.getRankEndIndex)
+      case _ => Array[Int]()
+    }
+    // All key including partition key, order key, unique keys, VariableRankRange rankEndIndex
+    Set(partitionKey, orderKey, keysInUniqueKeys, rankRangeKey).flatten.toArray
+  }
+
+  private def createNewInnerCalcProgram(
+      projectedFields: Array[Int],
+      inputRowType: RelDataType,
+      rexBuilder: RexBuilder): RexProgram = {
+    val projects = projectedFields.map(RexInputRef.of(_, inputRowType))
+    val inputColNames = inputRowType.getFieldNames
+    val colNames = projectedFields.map(inputColNames.get)
+    RexProgram.create(inputRowType, projects.toList, null, colNames.toList, rexBuilder)
+  }
+
+  private def createNewTopCalcProgram(
+      oldTopProgram: RexProgram,
+      fieldMapping: Map[Int, Int],
+      inputRowType: RelDataType,
+      rexBuilder: RexBuilder): RexProgram = {
+    val oldProjects = oldTopProgram.getProjectList
+    val newProjects = oldProjects.map(oldTopProgram.expandLocalRef).map {
+      p => FlinkRexUtil.adjustInputRef(p, fieldMapping)
+    }
+    val oldCondition = oldTopProgram.getCondition
+    val newCondition = if (oldCondition != null) {
+      FlinkRexUtil.adjustInputRef(oldTopProgram.expandLocalRef(oldCondition), fieldMapping)
+    } else {
+      null
+    }
+    val colNames = oldTopProgram.getOutputRowType.getFieldNames
+    RexProgram.create(
+      inputRowType,
+      newProjects,
+      newCondition,
+      colNames,
+      rexBuilder)
+  }
+
+  private def createNewRankOnCalc(
+      fieldMapping: Map[Int, Int],
+      input: Calc,
+      rank: FlinkLogicalRank): FlinkLogicalRank = {
+    val newPartitionKey = rank.partitionKey.toArray.map(fieldMapping(_))
+    val oldOrderKey = rank.orderKey
+    val oldFieldCollations = oldOrderKey.getFieldCollations
+    val newFieldCollations = oldFieldCollations.map {
+      fc => fc.copy(fieldMapping(fc.getFieldIndex))
+    }
+    val newOrderKey = if (newFieldCollations.eq(oldFieldCollations)) {
+      oldOrderKey
+    } else {
+      RelCollations.of(newFieldCollations)
+    }
+    new FlinkLogicalRank(
+      rank.getCluster,
+      rank.getTraitSet,
+      input,
+      ImmutableBitSet.of(newPartitionKey: _*),
+      newOrderKey,
+      rank.rankType,
+      rank.rankRange,
+      rank.rankNumberType,
+      rank.outputRankNumber)
+  }
+}
+
+object CalcRankTransposeRule {
+  val INSTANCE = new CalcRankTransposeRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRule.scala
new file mode 100644
index 0000000..60ab14d
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRule.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalRank}
+import org.apache.flink.table.runtime.rank.{ConstantRankRange, RankType}
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.RexProgramBuilder
+
+import java.math.{BigDecimal => JBigDecimal}
+
+/**
+  * Planner rule that removes the output column of rank number
+  * iff there is a equality condition for the rank column.
+  */
+class RankNumberColumnRemoveRule
+  extends RelOptRule(
+    operand(classOf[FlinkLogicalRank], any()),
+    "RankFunctionColumnRemoveRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val rank: FlinkLogicalRank = call.rel(0)
+    val isRowNumber = rank.rankType == RankType.ROW_NUMBER
+    val constantRowNumber = rank.rankRange match {
+      case range: ConstantRankRange => range.getRankStart == range.getRankEnd
+      case _ => false
+    }
+    isRowNumber && constantRowNumber && rank.outputRankNumber
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val rank: FlinkLogicalRank = call.rel(0)
+    val rowNumber = rank.rankRange.asInstanceOf[ConstantRankRange].getRankStart
+    val newRank = new FlinkLogicalRank(
+      rank.getCluster,
+      rank.getTraitSet,
+      rank.getInput,
+      rank.partitionKey,
+      rank.orderKey,
+      rank.rankType,
+      rank.rankRange,
+      rank.rankNumberType,
+      outputRankNumber = false)
+
+    val rexBuilder = rank.getCluster.getRexBuilder
+    val programBuilder = new RexProgramBuilder(newRank.getRowType, rexBuilder)
+    val fieldCount = rank.getRowType.getFieldCount
+    val fieldNames = rank.getRowType.getFieldNames
+    for (i <- 0 until fieldCount) {
+      if (i < fieldCount - 1) {
+        programBuilder.addProject(i, i, fieldNames.get(i))
+      } else {
+        val rowNumberLiteral = rexBuilder.makeBigintLiteral(new JBigDecimal(rowNumber))
+        programBuilder.addProject(i, rowNumberLiteral, fieldNames.get(i))
+      }
+    }
+
+    val rexProgram = programBuilder.getProgram
+    val calc = FlinkLogicalCalc.create(newRank, rexProgram)
+    call.transformTo(calc)
+  }
+}
+
+object RankNumberColumnRemoveRule {
+  val INSTANCE = new RankNumberColumnRemoveRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CorrelateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CorrelateUtil.scala
index 1c54f98..4fa0f99 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CorrelateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CorrelateUtil.scala
@@ -121,11 +121,11 @@ object CorrelateUtil {
       case (_, idx) => !projectableFieldSet.contains(idx)
     }.map {
       case (rex, _) =>
-        FlinkRexUtil.adjustInputRefs(rex, reservedFieldsMapping, newInputType)
+        FlinkRexUtil.adjustInputRef(rex, reservedFieldsMapping, newInputType)
     }.toList
 
     val shiftCondition = if (null != calcProgram.getCondition) {
-      FlinkRexUtil.adjustInputRefs(
+      FlinkRexUtil.adjustInputRef(
         calcProgram.expandLocalRef(calcProgram.getCondition),
         reservedFieldsMapping,
         newInputType)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala
index 18edfba..d82915b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala
@@ -388,7 +388,7 @@ object FlinkRelMdUtil {
   def splitPredicateOnRank(
       rank: Rank,
       predicate: RexNode): (Option[RexNode], Option[RexNode]) = {
-    val rankFunColumnIndex = getRankFunctionColumnIndex(rank).getOrElse(-1)
+    val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
     if (predicate == null || predicate.isAlwaysTrue || rankFunColumnIndex < 0) {
       return (Some(predicate), None)
     }
@@ -419,16 +419,6 @@ object FlinkRelMdUtil {
     case _ => 100D // default value now
   }
 
-  def getRankFunctionColumnIndex(rank: Rank): Option[Int] = {
-    if (rank.outputRankNumber) {
-      require(rank.getRowType.getFieldCount == rank.getInput.getRowType.getFieldCount + 1)
-      Some(rank.getRowType.getFieldCount - 1)
-    } else {
-      require(rank.getRowType.getFieldCount == rank.getInput.getRowType.getFieldCount)
-      None
-    }
-  }
-
   /**
     * Returns [[RexInputRef]] index set of projects corresponding to the given column index.
     * The index will be set as -1 if the given column in project is not a [[RexInputRef]].
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRexUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRexUtil.scala
index 33ff255..3a2b064 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRexUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRexUtil.scala
@@ -300,22 +300,22 @@ object FlinkRexUtil {
   }
 
   /**
-    * Adjust the condition's field indices according to mapOldToNewIndex.
+    * Adjust the expression's field indices according to fieldsOldToNewIndexMapping.
     *
-    * @param c The condition to be adjusted.
+    * @param expr The expression to be adjusted.
     * @param fieldsOldToNewIndexMapping A map containing the mapping the old field indices to new
-    *   field indices.
+    *                                   field indices.
     * @param rowType The row type of the new output.
-    * @return Return new condition with new field indices.
+    * @return Return new expression with new field indices.
     */
-  private[flink] def adjustInputRefs(
-      c: RexNode,
+  private[flink] def adjustInputRef(
+      expr: RexNode,
       fieldsOldToNewIndexMapping: Map[Int, Int],
-      rowType: RelDataType) = c.accept(
+      rowType: RelDataType): RexNode = expr.accept(
     new RexShuttle() {
 
       override def visitInputRef(inputRef: RexInputRef): RexNode = {
-        assert(fieldsOldToNewIndexMapping.containsKey(inputRef.getIndex))
+        require(fieldsOldToNewIndexMapping.containsKey(inputRef.getIndex))
         val newIndex = fieldsOldToNewIndexMapping(inputRef.getIndex)
         val ref = RexInputRef.of(newIndex, rowType)
         if (ref.getIndex == inputRef.getIndex && (ref.getType eq inputRef.getType)) {
@@ -327,6 +327,25 @@ object FlinkRexUtil {
       }
     })
 
+  /**
+    * Adjust the expression's field indices according to fieldsOldToNewIndexMapping.
+    *
+    * @param expr The expression to be adjusted.
+    * @param fieldsOldToNewIndexMapping A map containing the mapping the old field indices to new
+    *                                   field indices.
+    * @return Return new expression with new field indices.
+    */
+  private[flink] def adjustInputRef(
+      expr: RexNode,
+      fieldsOldToNewIndexMapping: Map[Int, Int]): RexNode = expr.accept(
+    new RexShuttle() {
+      override def visitInputRef(inputRef: RexInputRef): RexNode = {
+        require(fieldsOldToNewIndexMapping.containsKey(inputRef.getIndex))
+        val newIndex = fieldsOldToNewIndexMapping(inputRef.getIndex)
+        new RexInputRef(newIndex, inputRef.getType)
+      }
+    })
+
   private class EquivalentExprShuttle(rexBuilder: RexBuilder) extends RexShuttle {
     private val equiExprMap = mutable.HashMap[String, RexNode]()
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RankUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RankUtil.scala
index 33e2c6d..ebb4c8b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RankUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RankUtil.scala
@@ -294,4 +294,14 @@ object RankUtil {
     literals.head
   }
 
+  def getRankNumberColumnIndex(rank: Rank): Option[Int] = {
+    if (rank.outputRankNumber) {
+      require(rank.getRowType.getFieldCount == rank.getInput.getRowType.getFieldCount + 1)
+      Some(rank.getRowType.getFieldCount - 1)
+    } else {
+      require(rank.getRowType.getFieldCount == rank.getInput.getRowType.getFieldCount)
+      None
+    }
+  }
+
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRuleTest.xml
new file mode 100644
index 0000000..33b47e8
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRuleTest.xml
@@ -0,0 +1,277 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testNotTranspose">
+    <Resource name="sql">
+      <![CDATA[
+SELECT category, max_price, rank_num FROM (
+  SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as rank_num
+  FROM (
+     SELECT category, shopId, max(price) as max_price
+     FROM T
+     GROUP BY category, shopId
+  ))
+WHERE rank_num <= 3
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(category=[$0], max_price=[$2], rank_num=[$3])
++- LogicalFilter(condition=[<=($3, 3)])
+   +- LogicalProject(category=[$0], shopId=[$1], max_price=[$2], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalAggregate(group=[{0, 1}], max_price=[MAX($2)])
+         +- LogicalTableScan(table=[[T, source: [TestTableSource(category, shopId, price)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalCalc(select=[category, max_price, w0$o0])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[category], orderBy=[max_price ASC], select=[category, shopId, max_price, w0$o0])
+   +- FlinkLogicalAggregate(group=[{0, 1}], max_price=[MAX($2)])
+      +- FlinkLogicalTableSourceScan(table=[[T, source: [TestTableSource(category, shopId, price)]]], fields=[category, shopId, price])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testProjectRankNumber">
+    <Resource name="sql">
+      <![CDATA[
+SELECT rank_num, rowtime, a, rank_num, a, rank_num FROM (
+  SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+  FROM MyTable)
+WHERE  rank_num <= 2
+    ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(rank_num=[$4], rowtime=[$3], a=[$0], rank_num0=[$4], a0=[$0], rank_num1=[$4])
++- LogicalFilter(condition=[<=($4, 2)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalTableScan(table=[[MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalCalc(select=[rank_num1 AS rank_num, rowtime, a, rank_num1 AS rank_num0, a AS a0, rank_num1])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime, rank_num1])
+   +- FlinkLogicalCalc(select=[a, rowtime])
+      +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPruneOrderKeys">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a FROM (
+  SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+  FROM MyTable)
+WHERE rank_num = 1
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[=($4, 1)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalTableScan(table=[[MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalCalc(select=[a])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime])
+   +- FlinkLogicalCalc(select=[a, rowtime])
+      +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPrunePartitionKeys">
+    <Resource name="sql">
+      <![CDATA[
+SELECT rowtime FROM (
+  SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+  FROM MyTable)
+WHERE rank_num = 1
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(rowtime=[$3])
++- LogicalFilter(condition=[=($4, 1)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalTableScan(table=[[MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalCalc(select=[rowtime])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime])
+   +- FlinkLogicalCalc(select=[a, rowtime])
+      +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPushCalcWithConditionIntoRank">
+    <Resource name="sql">
+      <![CDATA[
+SELECT rowtime, c FROM (
+  SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+  FROM MyTable)
+WHERE  rank_num <= 2 AND a > 10
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(rowtime=[$3], c=[$2])
++- LogicalFilter(condition=[AND(<=($4, 2), >($0, 10))])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalTableScan(table=[[MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalCalc(select=[rowtime, c], where=[>(a, 10)])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, c, rowtime])
+   +- FlinkLogicalCalc(select=[a, c, rowtime])
+      +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPruneRankNumber">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, rowtime FROM (
+  SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+  FROM MyTable)
+WHERE rank_num <= 2
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], rowtime=[$3])
++- LogicalFilter(condition=[<=($4, 2)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalTableScan(table=[[MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime])
++- FlinkLogicalCalc(select=[a, rowtime])
+   +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPruneUniqueKeys">
+    <Resource name="sql">
+      <![CDATA[
+SELECT category, max_price, rank_num FROM (
+  SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as rank_num
+  FROM (
+     SELECT category, shopId, max(price) as max_price, min(price) as min_price
+     FROM T
+     GROUP BY category, shopId
+  ))
+WHERE rank_num <= 3
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(category=[$0], max_price=[$2], rank_num=[$4])
++- LogicalFilter(condition=[<=($4, 3)])
+   +- LogicalProject(category=[$0], shopId=[$1], max_price=[$2], min_price=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalAggregate(group=[{0, 1}], max_price=[MAX($2)], min_price=[MIN($2)])
+         +- LogicalTableScan(table=[[T, source: [TestTableSource(category, shopId, price)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalCalc(select=[category, max_price, w0$o0])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[category], orderBy=[max_price ASC], select=[category, shopId, max_price, w0$o0])
+   +- FlinkLogicalCalc(select=[category, shopId, max_price])
+      +- FlinkLogicalAggregate(group=[{0, 1}], max_price=[MAX($2)], min_price=[MIN($2)])
+         +- FlinkLogicalTableSourceScan(table=[[T, source: [TestTableSource(category, shopId, price)]]], fields=[category, shopId, price])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPruneUnusedProject">
+    <Resource name="sql">
+      <![CDATA[
+SELECT category, shopId, max_price, rank_num
+FROM (
+  SELECT category, shopId, max_price,
+      ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as rank_num
+  FROM (
+     SELECT category, shopId, max(price) as max_price, min(price) as min_price
+     FROM T
+     GROUP BY category, shopId
+  ))
+WHERE rank_num <= 3
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(category=[$0], shopId=[$1], max_price=[$2], rank_num=[$3])
++- LogicalFilter(condition=[<=($3, 3)])
+   +- LogicalProject(category=[$0], shopId=[$1], max_price=[$2], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalAggregate(group=[{0, 1}], max_price=[MAX($2)], min_price=[MIN($2)])
+         +- LogicalTableScan(table=[[T, source: [TestTableSource(category, shopId, price)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[category], orderBy=[max_price ASC], select=[category, shopId, max_price, w0$o0])
++- FlinkLogicalCalc(select=[category, shopId, max_price])
+   +- FlinkLogicalAggregate(group=[{0, 1}], max_price=[MAX($2)], min_price=[MIN($2)])
+      +- FlinkLogicalTableSourceScan(table=[[T, source: [TestTableSource(category, shopId, price)]]], fields=[category, shopId, price])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTrivialCalcIsRemoved">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, rowtime, rank_num FROM (
+  SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+  FROM MyTable)
+WHERE  rank_num <= 2
+    ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], rowtime=[$3], rank_num=[$4])
++- LogicalFilter(condition=[<=($4, 2)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalTableScan(table=[[MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime, w0$o0])
++- FlinkLogicalCalc(select=[a, rowtime])
+   +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml
index 8324972..70bf902 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml
@@ -89,8 +89,9 @@ LogicalProject(a=[$0], b=[$1], rk1=[$2], rk2=[$3])
     <Resource name="planAfter">
       <![CDATA[
 FlinkLogicalCalc(select=[a, b, rk2 AS rk1, rk2])
-+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[b], orderBy=[a ASC], select=[a, b, c, rk2])
-   +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[b], orderBy=[a ASC], select=[a, b, rk2])
+   +- FlinkLogicalCalc(select=[a, b])
+      +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -161,8 +162,9 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     <Resource name="planAfter">
       <![CDATA[
 FlinkLogicalCalc(select=[a, b, w0$o0], where=[>(a, 10)])
-+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC], select=[a, b, c, w0$o0])
-   +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC], select=[a, b, w0$o0])
+   +- FlinkLogicalCalc(select=[a, b])
+      +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -184,8 +186,8 @@ LogicalProject(a=[$0], b=[$1], rn=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0])
-+- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC], select=[a, b, c, w0$o0])
+FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC], select=[a, b, w0$o0])
++- FlinkLogicalCalc(select=[a, b])
    +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -304,8 +306,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0])
-+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[], orderBy=[a ASC], select=[a, b, c, w0$o0])
+FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[], orderBy=[a ASC], select=[a, b, w0$o0])
++- FlinkLogicalCalc(select=[a, b])
    +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml
new file mode 100644
index 0000000..754e016
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testCannotRemoveRankNumberColumn1">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, rank_num FROM (
+  SELECT *,
+      RANK() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+  FROM MyTable)
+WHERE rank_num >= 1 AND rank_num < 2
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], rank_num=[$4])
++- LogicalFilter(condition=[AND(>=($4, 1), <($4, 2))])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[RANK() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalTableScan(table=[[MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, w0$o0])
++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime, w0$o0])
+   +- FlinkLogicalCalc(select=[a, rowtime])
+      +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCannotRemoveRankNumberColumn2">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, rank_num FROM (
+  SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+  FROM MyTable)
+WHERE rank_num >= 1 AND rank_num < 3
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], rank_num=[$4])
++- LogicalFilter(condition=[AND(>=($4, 1), <($4, 3))])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalTableScan(table=[[MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, w0$o0])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime, w0$o0])
+   +- FlinkLogicalCalc(select=[a, rowtime])
+      +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCannotRemoveRankNumberColumn3">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a FROM (
+  SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+  FROM MyTable)
+WHERE rank_num >= 1 AND rank_num < 2
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[AND(>=($4, 1), <($4, 2))])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalTableScan(table=[[MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalCalc(select=[a])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime])
+   +- FlinkLogicalCalc(select=[a, rowtime])
+      +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCouldRemoveRankNumberColumn">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, rank_num FROM (
+  SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+  FROM MyTable)
+WHERE rank_num >= 1 AND rank_num < 2
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], rank_num=[$4])
++- LogicalFilter(condition=[AND(>=($4, 1), <($4, 2))])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+      +- LogicalTableScan(table=[[MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, 1:BIGINT AS w0$o0])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime])
+   +- FlinkLogicalCalc(select=[a, rowtime])
+      +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RankTest.xml
index c73e694..215339f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RankTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RankTest.xml
@@ -89,10 +89,11 @@ LogicalProject(a=[$0], b=[$1], c=[$2], row_num=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c, w0$o0])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, proctime, rowtime, w0$o0])
+Calc(select=[a, b, c, 1:BIGINT AS w0$o0])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[b DESC], select=[a, b, c])
    +- Exchange(distribution=[hash[a]])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+      +- Calc(select=[a, b, c])
+         +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -115,9 +116,10 @@ LogicalProject(a=[$0], rk=[$1], b=[$2], c=[$3])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, rk, b, c])
-+- Rank(strategy=[AppendFastStrategy], rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[a], orderBy=[a ASC], select=[a, b, c, proctime, rowtime, rk])
++- Rank(strategy=[AppendFastStrategy], rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[a], orderBy=[a ASC], select=[a, b, c, rk])
    +- Exchange(distribution=[hash[a]])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+      +- Calc(select=[a, b, c])
+         +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -143,9 +145,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, c])
-+- Rank(strategy=[AppendFastStrategy], rankType=[RANK], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[proctime ASC], select=[a, b, c, proctime, rowtime])
++- Rank(strategy=[AppendFastStrategy], rankType=[RANK], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[proctime ASC], select=[a, b, c, proctime])
    +- Exchange(distribution=[hash[a]])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+      +- Calc(select=[a, b, c, proctime])
+         +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -171,9 +174,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, c])
-+- Rank(strategy=[AppendFastStrategy], rankType=[RANK], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[proctime DESC], select=[a, b, c, proctime, rowtime])
++- Rank(strategy=[AppendFastStrategy], rankType=[RANK], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[proctime DESC], select=[a, b, c, proctime])
    +- Exchange(distribution=[hash[a]])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+      +- Calc(select=[a, b, c, proctime])
+         +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -201,7 +205,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
 Calc(select=[a, b, c])
 +- Deduplicate(keepLastRow=[false], key=[a], order=[PROCTIME])
    +- Exchange(distribution=[hash[a]])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+      +- Calc(select=[a, b, c, proctime])
+         +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -229,7 +234,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
 Calc(select=[a, b, c])
 +- Deduplicate(keepLastRow=[true], key=[a], order=[PROCTIME])
    +- Exchange(distribution=[hash[a]])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+      +- Calc(select=[a, b, c, proctime])
+         +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -255,9 +261,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, c])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime ASC], select=[a, b, c, proctime, rowtime])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime ASC], select=[a, b, c, rowtime])
    +- Exchange(distribution=[hash[a]])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+      +- Calc(select=[a, b, c, rowtime])
+         +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -283,9 +290,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, c])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, b, c, proctime, rowtime])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, b, c, rowtime])
    +- Exchange(distribution=[hash[a]])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+      +- Calc(select=[a, b, c, rowtime])
+         +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -310,9 +318,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], row_num=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, proctime, rowtime, w0$o0], updateAsRetraction=[false], accMode=[Acc])
-   +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
++- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+   +- Calc(select=[a, b, c], updateAsRetraction=[false], accMode=[Acc])
       +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
 ]]>
     </Resource>
@@ -338,9 +346,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], row_num=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, proctime, rowtime, w0$o0], updateAsRetraction=[false], accMode=[Acc])
-   +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
++- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+   +- Calc(select=[a, b, c], updateAsRetraction=[false], accMode=[Acc])
       +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
 ]]>
     </Resource>
@@ -761,9 +769,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2], row_num=[$3])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, c, 10:BIGINT AS row_num], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=10, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=10, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c], updateAsRetraction=[false], accMode=[Acc])
    +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
+      +- Calc(select=[a, b, c], updateAsRetraction=[false], accMode=[Acc])
+         +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -867,9 +876,9 @@ LogicalProject(row_num=[$3], a=[$0], c=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[row_num, a, c], where=[IS NOT NULL(b)], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, proctime, rowtime, row_num], updateAsRetraction=[false], accMode=[Acc])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, row_num], updateAsRetraction=[false], accMode=[Acc])
    +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
-      +- Calc(select=[a, b, c, proctime, rowtime], where=[>(c, 1000)], updateAsRetraction=[false], accMode=[Acc])
+      +- Calc(select=[a, b, c], where=[>(c, 1000)], updateAsRetraction=[false], accMode=[Acc])
          +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRuleTest.scala
new file mode 100644
index 0000000..8843e6b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRuleTest.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.FlinkStreamProgram
+import org.apache.flink.table.util.TableTestBase
+
+import org.junit.{Before, Test}
+
+/**
+  * Test for [[CalcRankTransposeRule]].
+  */
+class CalcRankTransposeRuleTest extends TableTestBase {
+  private val util = streamTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    util.buildStreamProgram(FlinkStreamProgram.PHYSICAL)
+
+    util.addDataStream[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime)
+    util.addTableSource[(String, Int, String)]("T", 'category, 'shopId, 'price)
+  }
+
+  @Test
+  def testPruneOrderKeys(): Unit = {
+    // Push Calc into Rank, project column (a, rowtime), prune column (b, c)
+    val sql =
+      """
+        |SELECT a FROM (
+        |  SELECT *,
+        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+        |  FROM MyTable)
+        |WHERE rank_num = 1
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testPrunePartitionKeys(): Unit = {
+    // Push Calc into Rank, project column (a, rowtime), prune column (b, c)
+    val sql =
+      """
+        |SELECT rowtime FROM (
+        |  SELECT *,
+        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+        |  FROM MyTable)
+        |WHERE rank_num = 1
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testPruneUniqueKeys(): Unit = {
+    // Push Calc into Rank, project column (category, shopId, max_price), prune column (min_price)
+    val sql =
+      """
+        |SELECT category, max_price, rank_num FROM (
+        |  SELECT *,
+        |      ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as rank_num
+        |  FROM (
+        |     SELECT category, shopId, max(price) as max_price, min(price) as min_price
+        |     FROM T
+        |     GROUP BY category, shopId
+        |  ))
+        |WHERE rank_num <= 3
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testNotTranspose(): Unit = {
+    // Not transpose calc into Rank because there is no columns to prune
+    val sql =
+      """
+        |SELECT category, max_price, rank_num FROM (
+        |  SELECT *,
+        |      ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as rank_num
+        |  FROM (
+        |     SELECT category, shopId, max(price) as max_price
+        |     FROM T
+        |     GROUP BY category, shopId
+        |  ))
+        |WHERE rank_num <= 3
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testPruneRankNumber(): Unit = {
+    // Push Calc into Rank, project column (a, rowtime), prune column (b, c)
+    val sql =
+      """
+        |SELECT a, rowtime FROM (
+        |  SELECT *,
+        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+        |  FROM MyTable)
+        |WHERE rank_num <= 2
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testProjectRankNumber(): Unit = {
+    // Push Calc into Rank, project column (a, rowtime), prune column (b, c)
+    // Need a New Calc on top of Rank to keep equivalency
+    val sql =
+    """
+      |SELECT rank_num, rowtime, a, rank_num, a, rank_num FROM (
+      |  SELECT *,
+      |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+      |  FROM MyTable)
+      |WHERE  rank_num <= 2
+    """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testTrivialCalcIsRemoved(): Unit = {
+    // Push Calc into Rank, project column (a, rowtime), prune column (b, c)
+    // Does not need a New Calc on top of Rank because it is trivial
+    val sql =
+    """
+      |SELECT a, rowtime, rank_num FROM (
+      |  SELECT *,
+      |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+      |  FROM MyTable)
+      |WHERE  rank_num <= 2
+    """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testPushCalcWithConditionIntoRank(): Unit = {
+    // Push Calc into Rank even if it has filter condition, project column(rowtime, c, a), prune(b)
+    val sql =
+      """
+        |SELECT rowtime, c FROM (
+        |  SELECT *,
+        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+        |  FROM MyTable)
+        |WHERE  rank_num <= 2 AND a > 10
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testPruneUnusedProject(): Unit = {
+    // Push Calc into Rank, project(category, shopId, max_price), prune (min_price)
+    val sql =
+      """
+        |SELECT category, shopId, max_price, rank_num
+        |FROM (
+        |  SELECT category, shopId, max_price,
+        |      ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as rank_num
+        |  FROM (
+        |     SELECT category, shopId, max(price) as max_price, min(price) as min_price
+        |     FROM T
+        |     GROUP BY category, shopId
+        |  ))
+        |WHERE rank_num <= 3
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRuleTest.scala
new file mode 100644
index 0000000..5d48714
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRuleTest.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.FlinkStreamProgram
+import org.apache.flink.table.util.TableTestBase
+
+import org.junit.{Before, Test}
+
+/**
+  * Test for [[RankNumberColumnRemoveRule]].
+  */
+class RankNumberColumnRemoveRuleTest extends TableTestBase {
+  private val util = streamTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    util.buildStreamProgram(FlinkStreamProgram.PHYSICAL)
+
+    util.addDataStream[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime)
+  }
+
+  @Test
+  def testCannotRemoveRankNumberColumn1(): Unit = {
+    val sql =
+      """
+        |SELECT a, rank_num FROM (
+        |  SELECT *,
+        |      RANK() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+        |  FROM MyTable)
+        |WHERE rank_num >= 1 AND rank_num < 2
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testCannotRemoveRankNumberColumn2(): Unit = {
+    val sql =
+      """
+        |SELECT a, rank_num FROM (
+        |  SELECT *,
+        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+        |  FROM MyTable)
+        |WHERE rank_num >= 1 AND rank_num < 3
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testCannotRemoveRankNumberColumn3(): Unit = {
+    // the Rank does not output rank number, so this rule will not be matched
+    val sql =
+      """
+        |SELECT a FROM (
+        |  SELECT *,
+        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+        |  FROM MyTable)
+        |WHERE rank_num >= 1 AND rank_num < 2
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testCouldRemoveRankNumberColumn(): Unit = {
+    val sql =
+      """
+        |SELECT a, rank_num FROM (
+        |  SELECT *,
+        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+        |  FROM MyTable)
+        |WHERE rank_num >= 1 AND rank_num < 2
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+}