You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/22 01:08:11 UTC
[flink] branch master updated: [FLINK-12524] [table-planner-blink]
Introduce CalcRankTransposeRule & RankNumberColumnRemoveRule
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e6f968c [FLINK-12524] [table-planner-blink] Introduce CalcRankTransposeRule & RankNumberColumnRemoveRule
e6f968c is described below
commit e6f968c504ac323b8ae5da8a9545aae3ac69197c
Author: godfrey he <go...@163.com>
AuthorDate: Wed May 22 09:07:48 2019 +0800
[FLINK-12524] [table-planner-blink] Introduce CalcRankTransposeRule & RankNumberColumnRemoveRule
This closes #8460
---
.../plan/metadata/FlinkRelMdColumnInterval.scala | 4 +-
.../plan/metadata/FlinkRelMdColumnUniqueness.scala | 4 +-
.../plan/metadata/FlinkRelMdDistinctRowCount.scala | 4 +-
.../plan/metadata/FlinkRelMdPopulationSize.scala | 4 +-
.../plan/metadata/FlinkRelMdUniqueGroups.scala | 4 +-
.../table/plan/metadata/FlinkRelMdUniqueKeys.scala | 4 +-
.../table/plan/rules/FlinkBatchRuleSets.scala | 4 +
.../table/plan/rules/FlinkStreamRuleSets.scala | 7 +-
.../plan/rules/logical/CalcRankTransposeRule.scala | 197 +++++++++++++++
.../rules/logical/RankNumberColumnRemoveRule.scala | 84 +++++++
.../flink/table/plan/util/CorrelateUtil.scala | 4 +-
.../flink/table/plan/util/FlinkRelMdUtil.scala | 12 +-
.../flink/table/plan/util/FlinkRexUtil.scala | 35 ++-
.../apache/flink/table/plan/util/RankUtil.scala | 10 +
.../rules/logical/CalcRankTransposeRuleTest.xml | 277 +++++++++++++++++++++
.../FlinkLogicalRankRuleForRangeEndTest.xml | 18 +-
.../logical/RankNumberColumnRemoveRuleTest.xml | 127 ++++++++++
.../flink/table/plan/stream/sql/RankTest.xml | 59 +++--
.../rules/logical/CalcRankTransposeRuleTest.scala | 190 ++++++++++++++
.../logical/RankNumberColumnRemoveRuleTest.scala | 96 +++++++
20 files changed, 1077 insertions(+), 67 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
index d75558c..cc55906 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.plan.nodes.physical.batch._
import org.apache.flink.table.plan.nodes.physical.stream._
import org.apache.flink.table.plan.schema.FlinkRelOptTable
import org.apache.flink.table.plan.stats._
-import org.apache.flink.table.plan.util.{AggregateUtil, ColumnIntervalUtil, FlinkRelMdUtil, FlinkRelOptUtil}
+import org.apache.flink.table.plan.util.{AggregateUtil, ColumnIntervalUtil, FlinkRelOptUtil, RankUtil}
import org.apache.flink.table.runtime.rank.{ConstantRankRange, VariableRankRange}
import org.apache.flink.util.Preconditions
@@ -321,7 +321,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
mq: RelMetadataQuery,
index: Int): ValueInterval = {
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
- val rankFunColumnIndex = FlinkRelMdUtil.getRankFunctionColumnIndex(rank).getOrElse(-1)
+ val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
if (index == rankFunColumnIndex) {
rank.rankRange match {
case r: ConstantRankRange => ValueInterval(r.getRankStart, r.getRankEnd)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
index 25754a9..1279a80 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.plan.nodes.logical._
import org.apache.flink.table.plan.nodes.physical.batch._
import org.apache.flink.table.plan.nodes.physical.stream._
import org.apache.flink.table.plan.schema.FlinkRelOptTable
-import org.apache.flink.table.plan.util.FlinkRelMdUtil
+import org.apache.flink.table.plan.util.{FlinkRelMdUtil, RankUtil}
import org.apache.flink.table.runtime.rank.RankType
import org.apache.flink.table.sources.TableSource
@@ -267,7 +267,7 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBoolean = {
val input = rank.getInput
- val rankFunColumnIndex = FlinkRelMdUtil.getRankFunctionColumnIndex(rank).getOrElse(-1)
+ val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
if (rankFunColumnIndex < 0) {
mq.areColumnsUnique(input, columns, ignoreNulls)
} else {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCount.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCount.scala
index 049bdaa..5b33318 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCount.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCount.scala
@@ -22,7 +22,7 @@ import org.apache.flink.table.api.{PlannerConfigOptions, TableException}
import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank}
import org.apache.flink.table.plan.nodes.physical.batch._
import org.apache.flink.table.plan.schema.FlinkRelOptTable
-import org.apache.flink.table.plan.util.{FlinkRelMdUtil, FlinkRelOptUtil, FlinkRexUtil}
+import org.apache.flink.table.plan.util.{FlinkRelMdUtil, FlinkRelOptUtil, FlinkRexUtil, RankUtil}
import org.apache.flink.table.{JArrayList, JDouble}
import org.apache.calcite.plan.RelOptUtil
@@ -254,7 +254,7 @@ class FlinkRelMdDistinctRowCount private extends MetadataHandler[BuiltInMetadata
mq: RelMetadataQuery,
groupKey: ImmutableBitSet,
predicate: RexNode): JDouble = {
- val rankFunColumnIndex = FlinkRelMdUtil.getRankFunctionColumnIndex(rank).getOrElse(-1)
+ val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
val newGroupKey = groupKey.clearIf(rankFunColumnIndex, rankFunColumnIndex > 0)
val (nonRankPred, rankPred) = FlinkRelMdUtil.splitPredicateOnRank(rank, predicate)
val inputNdv: JDouble = if (newGroupKey.nonEmpty) {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSize.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSize.scala
index d927a6d..389d36a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSize.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSize.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.metadata
import org.apache.flink.table.api.TableException
import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank}
import org.apache.flink.table.plan.nodes.physical.batch._
-import org.apache.flink.table.plan.util.FlinkRelMdUtil
+import org.apache.flink.table.plan.util.{FlinkRelMdUtil, RankUtil}
import org.apache.flink.table.{JArrayList, JDouble}
import org.apache.calcite.plan.volcano.RelSubset
@@ -179,7 +179,7 @@ class FlinkRelMdPopulationSize private extends MetadataHandler[BuiltInMetadata.P
rel: Rank,
mq: RelMetadataQuery,
groupKey: ImmutableBitSet): JDouble = {
- val rankFunColumnIndex = FlinkRelMdUtil.getRankFunctionColumnIndex(rel).getOrElse(-1)
+ val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rel).getOrElse(-1)
if (rankFunColumnIndex < 0 || !groupKey.toArray.contains(rankFunColumnIndex)) {
mq.getPopulationSize(rel.getInput, groupKey)
} else {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
index fe157e8..b559fe0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.metadata
import org.apache.flink.table.plan.metadata.FlinkMetadata.UniqueGroups
import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank}
import org.apache.flink.table.plan.nodes.physical.batch._
-import org.apache.flink.table.plan.util.FlinkRelMdUtil
+import org.apache.flink.table.plan.util.{FlinkRelMdUtil, RankUtil}
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.RelNode
@@ -179,7 +179,7 @@ class FlinkRelMdUniqueGroups private extends MetadataHandler[UniqueGroups] {
columns: ImmutableBitSet): ImmutableBitSet = {
val columnList = columns.toList
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
- val rankFunColumnIndex = FlinkRelMdUtil.getRankFunctionColumnIndex(rank).getOrElse(-1)
+ val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
val columnSkipRankCol = columnList.filter(_ != rankFunColumnIndex)
if (columnSkipRankCol.isEmpty) {
return columns
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
index 2c87feb..f1c9060 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.plan.nodes.logical._
import org.apache.flink.table.plan.nodes.physical.batch._
import org.apache.flink.table.plan.nodes.physical.stream._
import org.apache.flink.table.plan.schema.FlinkRelOptTable
-import org.apache.flink.table.plan.util.FlinkRelMdUtil
+import org.apache.flink.table.plan.util.{FlinkRelMdUtil, RankUtil}
import org.apache.flink.table.runtime.rank.RankType
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.{JArrayList, JBoolean, JHashMap, JHashSet, JList, JSet}
@@ -227,7 +227,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
mq: RelMetadataQuery,
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
val inputUniqueKeys = mq.getUniqueKeys(rel.getInput, ignoreNulls)
- val rankFunColumnIndex = FlinkRelMdUtil.getRankFunctionColumnIndex(rel).getOrElse(-1)
+ val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rel).getOrElse(-1)
if (rankFunColumnIndex < 0) {
inputUniqueKeys
} else {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index 103b757..fa48ecf 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -229,6 +229,10 @@ object FlinkBatchRuleSets {
// rank rules
FlinkLogicalRankRule.CONSTANT_RANGE_INSTANCE,
+ // transpose calc past rank to reduce rank input fields
+ CalcRankTransposeRule.INSTANCE,
+ // remove output of rank number when it is a constant
+ RankNumberColumnRemoveRule.INSTANCE,
// calc rules
FilterCalcMergeRule.INSTANCE,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index 9359ae1..1aacac8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -19,9 +19,10 @@
package org.apache.flink.table.plan.rules
import org.apache.flink.table.plan.nodes.logical._
-import org.apache.flink.table.plan.rules.logical.{CalcSnapshotTransposeRule, _}
+import org.apache.flink.table.plan.rules.logical._
import org.apache.flink.table.plan.rules.physical.FlinkExpandConversionRule
import org.apache.flink.table.plan.rules.physical.stream._
+
import org.apache.calcite.rel.core.RelFactories
import org.apache.calcite.rel.logical.{LogicalIntersect, LogicalMinus, LogicalUnion}
import org.apache.calcite.rel.rules._
@@ -271,6 +272,10 @@ object FlinkStreamRuleSets {
val LOGICAL_REWRITE: RuleSet = RuleSets.ofList(
// transform over window to topn node
FlinkLogicalRankRule.INSTANCE,
+ // transpose calc past rank to reduce rank input fields
+ CalcRankTransposeRule.INSTANCE,
+ // remove output of rank number when it is a constant
+ RankNumberColumnRemoveRule.INSTANCE,
// split distinct aggregate to reduce data skew
SplitAggregateRule.INSTANCE,
// transpose calc past snapshot
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRule.scala
new file mode 100644
index 0000000..39f7b89
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRule.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.logical._
+import org.apache.flink.table.plan.util.{FlinkRexUtil, RankUtil}
+import org.apache.flink.table.runtime.rank.VariableRankRange
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.RelCollations
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.{RexBuilder, RexInputRef, RexProgram}
+import org.apache.calcite.util.ImmutableBitSet
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that transposes [[FlinkLogicalCalc]] past [[FlinkLogicalRank]]
+ * to reduce rank input fields.
+ */
+class CalcRankTransposeRule
+ extends RelOptRule(
+ operand(classOf[FlinkLogicalCalc],
+ operand(classOf[FlinkLogicalRank], any())),
+ "CalcRankTransposeRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val calc: FlinkLogicalCalc = call.rel(0)
+ val rank: FlinkLogicalRank = call.rel(1)
+
+ val totalColumnCount = rank.getInput.getRowType.getFieldCount
+ // apply the rule only when calc could prune some columns
+ val pushableColumns = getPushableColumns(calc, rank)
+ pushableColumns.length < totalColumnCount
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val calc: FlinkLogicalCalc = call.rel(0)
+ val rank: FlinkLogicalRank = call.rel(1)
+
+ val pushableColumns = getPushableColumns(calc, rank)
+
+ val rexBuilder = calc.getCluster.getRexBuilder
+ // create a new Calc to project columns of Rank's input
+ val innerProgram = createNewInnerCalcProgram(
+ pushableColumns,
+ rank.getInput.getRowType,
+ rexBuilder)
+ val newInnerCalc = calc.copy(calc.getTraitSet, rank.getInput, innerProgram)
+
+ // create a new Rank on top of new Calc
+ var fieldMapping = pushableColumns.zipWithIndex.toMap
+ val newRank = createNewRankOnCalc(fieldMapping, newInnerCalc, rank)
+
+ // create a new Calc on top of newRank if needed
+ if (rank.outputRankNumber) {
+ // append RankNumber field mapping
+ val oldRankFunFieldIdx = RankUtil.getRankNumberColumnIndex(rank)
+ .getOrElse(throw new TableException("This should not happen"))
+ val newRankFunFieldIdx = RankUtil.getRankNumberColumnIndex(newRank)
+ .getOrElse(throw new TableException("This should not happen"))
+ fieldMapping += (oldRankFunFieldIdx -> newRankFunFieldIdx)
+ }
+ val topProgram = createNewTopCalcProgram(
+ calc.getProgram,
+ fieldMapping,
+ newRank.getRowType,
+ rexBuilder)
+
+ val equiv = if (topProgram.isTrivial) {
+ // Ignore newTopCac if it's program is trivial
+ newRank
+ } else {
+ calc.copy(calc.getTraitSet, newRank, topProgram)
+ }
+ call.transformTo(equiv)
+ }
+
+ private def getPushableColumns(calc: Calc, rank: FlinkLogicalRank): Array[Int] = {
+ val usedFields = getUsedFields(calc.getProgram)
+ val rankFunFieldIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
+ val usedFieldsExcludeRankNumber = usedFields.filter(_ != rankFunFieldIndex)
+
+ val requiredFields = getKeyFields(rank)
+ usedFieldsExcludeRankNumber.union(requiredFields).distinct.sorted
+ }
+
+ private def getUsedFields(program: RexProgram): Array[Int] = {
+ val projects = program.getProjectList.map(program.expandLocalRef)
+ val condition = if (program.getCondition != null) {
+ program.expandLocalRef(program.getCondition)
+ } else {
+ null
+ }
+ RelOptUtil.InputFinder.bits(projects, condition).toArray
+ }
+
+ private def getKeyFields(rank: FlinkLogicalRank): Array[Int] = {
+ val partitionKey = rank.partitionKey.toArray
+ val orderKey = rank.orderKey.getFieldCollations.map(_.getFieldIndex).toArray
+ val uniqueKeys = rank.getCluster.getMetadataQuery.getUniqueKeys(rank.getInput)
+ val keysInUniqueKeys = if (uniqueKeys == null || uniqueKeys.isEmpty) {
+ Array[Int]()
+ } else {
+ uniqueKeys.flatMap(_.toArray).toArray
+ }
+ val rankRangeKey = rank.rankRange match {
+ case v: VariableRankRange => Array(v.getRankEndIndex)
+ case _ => Array[Int]()
+ }
+ // All key including partition key, order key, unique keys, VariableRankRange rankEndIndex
+ Set(partitionKey, orderKey, keysInUniqueKeys, rankRangeKey).flatten.toArray
+ }
+
+ private def createNewInnerCalcProgram(
+ projectedFields: Array[Int],
+ inputRowType: RelDataType,
+ rexBuilder: RexBuilder): RexProgram = {
+ val projects = projectedFields.map(RexInputRef.of(_, inputRowType))
+ val inputColNames = inputRowType.getFieldNames
+ val colNames = projectedFields.map(inputColNames.get)
+ RexProgram.create(inputRowType, projects.toList, null, colNames.toList, rexBuilder)
+ }
+
+ private def createNewTopCalcProgram(
+ oldTopProgram: RexProgram,
+ fieldMapping: Map[Int, Int],
+ inputRowType: RelDataType,
+ rexBuilder: RexBuilder): RexProgram = {
+ val oldProjects = oldTopProgram.getProjectList
+ val newProjects = oldProjects.map(oldTopProgram.expandLocalRef).map {
+ p => FlinkRexUtil.adjustInputRef(p, fieldMapping)
+ }
+ val oldCondition = oldTopProgram.getCondition
+ val newCondition = if (oldCondition != null) {
+ FlinkRexUtil.adjustInputRef(oldTopProgram.expandLocalRef(oldCondition), fieldMapping)
+ } else {
+ null
+ }
+ val colNames = oldTopProgram.getOutputRowType.getFieldNames
+ RexProgram.create(
+ inputRowType,
+ newProjects,
+ newCondition,
+ colNames,
+ rexBuilder)
+ }
+
+ private def createNewRankOnCalc(
+ fieldMapping: Map[Int, Int],
+ input: Calc,
+ rank: FlinkLogicalRank): FlinkLogicalRank = {
+ val newPartitionKey = rank.partitionKey.toArray.map(fieldMapping(_))
+ val oldOrderKey = rank.orderKey
+ val oldFieldCollations = oldOrderKey.getFieldCollations
+ val newFieldCollations = oldFieldCollations.map {
+ fc => fc.copy(fieldMapping(fc.getFieldIndex))
+ }
+ val newOrderKey = if (newFieldCollations.eq(oldFieldCollations)) {
+ oldOrderKey
+ } else {
+ RelCollations.of(newFieldCollations)
+ }
+ new FlinkLogicalRank(
+ rank.getCluster,
+ rank.getTraitSet,
+ input,
+ ImmutableBitSet.of(newPartitionKey: _*),
+ newOrderKey,
+ rank.rankType,
+ rank.rankRange,
+ rank.rankNumberType,
+ rank.outputRankNumber)
+ }
+}
+
+object CalcRankTransposeRule {
+ val INSTANCE = new CalcRankTransposeRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRule.scala
new file mode 100644
index 0000000..60ab14d
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRule.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalRank}
+import org.apache.flink.table.runtime.rank.{ConstantRankRange, RankType}
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.RexProgramBuilder
+
+import java.math.{BigDecimal => JBigDecimal}
+
+/**
+ * Planner rule that removes the output column of rank number
+ * iff there is a equality condition for the rank column.
+ */
+class RankNumberColumnRemoveRule
+ extends RelOptRule(
+ operand(classOf[FlinkLogicalRank], any()),
+ "RankFunctionColumnRemoveRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val rank: FlinkLogicalRank = call.rel(0)
+ val isRowNumber = rank.rankType == RankType.ROW_NUMBER
+ val constantRowNumber = rank.rankRange match {
+ case range: ConstantRankRange => range.getRankStart == range.getRankEnd
+ case _ => false
+ }
+ isRowNumber && constantRowNumber && rank.outputRankNumber
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val rank: FlinkLogicalRank = call.rel(0)
+ val rowNumber = rank.rankRange.asInstanceOf[ConstantRankRange].getRankStart
+ val newRank = new FlinkLogicalRank(
+ rank.getCluster,
+ rank.getTraitSet,
+ rank.getInput,
+ rank.partitionKey,
+ rank.orderKey,
+ rank.rankType,
+ rank.rankRange,
+ rank.rankNumberType,
+ outputRankNumber = false)
+
+ val rexBuilder = rank.getCluster.getRexBuilder
+ val programBuilder = new RexProgramBuilder(newRank.getRowType, rexBuilder)
+ val fieldCount = rank.getRowType.getFieldCount
+ val fieldNames = rank.getRowType.getFieldNames
+ for (i <- 0 until fieldCount) {
+ if (i < fieldCount - 1) {
+ programBuilder.addProject(i, i, fieldNames.get(i))
+ } else {
+ val rowNumberLiteral = rexBuilder.makeBigintLiteral(new JBigDecimal(rowNumber))
+ programBuilder.addProject(i, rowNumberLiteral, fieldNames.get(i))
+ }
+ }
+
+ val rexProgram = programBuilder.getProgram
+ val calc = FlinkLogicalCalc.create(newRank, rexProgram)
+ call.transformTo(calc)
+ }
+}
+
+object RankNumberColumnRemoveRule {
+ val INSTANCE = new RankNumberColumnRemoveRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CorrelateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CorrelateUtil.scala
index 1c54f98..4fa0f99 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CorrelateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CorrelateUtil.scala
@@ -121,11 +121,11 @@ object CorrelateUtil {
case (_, idx) => !projectableFieldSet.contains(idx)
}.map {
case (rex, _) =>
- FlinkRexUtil.adjustInputRefs(rex, reservedFieldsMapping, newInputType)
+ FlinkRexUtil.adjustInputRef(rex, reservedFieldsMapping, newInputType)
}.toList
val shiftCondition = if (null != calcProgram.getCondition) {
- FlinkRexUtil.adjustInputRefs(
+ FlinkRexUtil.adjustInputRef(
calcProgram.expandLocalRef(calcProgram.getCondition),
reservedFieldsMapping,
newInputType)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala
index 18edfba..d82915b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala
@@ -388,7 +388,7 @@ object FlinkRelMdUtil {
def splitPredicateOnRank(
rank: Rank,
predicate: RexNode): (Option[RexNode], Option[RexNode]) = {
- val rankFunColumnIndex = getRankFunctionColumnIndex(rank).getOrElse(-1)
+ val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
if (predicate == null || predicate.isAlwaysTrue || rankFunColumnIndex < 0) {
return (Some(predicate), None)
}
@@ -419,16 +419,6 @@ object FlinkRelMdUtil {
case _ => 100D // default value now
}
- def getRankFunctionColumnIndex(rank: Rank): Option[Int] = {
- if (rank.outputRankNumber) {
- require(rank.getRowType.getFieldCount == rank.getInput.getRowType.getFieldCount + 1)
- Some(rank.getRowType.getFieldCount - 1)
- } else {
- require(rank.getRowType.getFieldCount == rank.getInput.getRowType.getFieldCount)
- None
- }
- }
-
/**
* Returns [[RexInputRef]] index set of projects corresponding to the given column index.
* The index will be set as -1 if the given column in project is not a [[RexInputRef]].
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRexUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRexUtil.scala
index 33ff255..3a2b064 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRexUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRexUtil.scala
@@ -300,22 +300,22 @@ object FlinkRexUtil {
}
/**
- * Adjust the condition's field indices according to mapOldToNewIndex.
+ * Adjust the expression's field indices according to fieldsOldToNewIndexMapping.
*
- * @param c The condition to be adjusted.
+ * @param expr The expression to be adjusted.
* @param fieldsOldToNewIndexMapping A map containing the mapping the old field indices to new
- * field indices.
+ * field indices.
* @param rowType The row type of the new output.
- * @return Return new condition with new field indices.
+ * @return Return new expression with new field indices.
*/
- private[flink] def adjustInputRefs(
- c: RexNode,
+ private[flink] def adjustInputRef(
+ expr: RexNode,
fieldsOldToNewIndexMapping: Map[Int, Int],
- rowType: RelDataType) = c.accept(
+ rowType: RelDataType): RexNode = expr.accept(
new RexShuttle() {
override def visitInputRef(inputRef: RexInputRef): RexNode = {
- assert(fieldsOldToNewIndexMapping.containsKey(inputRef.getIndex))
+ require(fieldsOldToNewIndexMapping.containsKey(inputRef.getIndex))
val newIndex = fieldsOldToNewIndexMapping(inputRef.getIndex)
val ref = RexInputRef.of(newIndex, rowType)
if (ref.getIndex == inputRef.getIndex && (ref.getType eq inputRef.getType)) {
@@ -327,6 +327,25 @@ object FlinkRexUtil {
}
})
+ /**
+ * Adjust the expression's field indices according to fieldsOldToNewIndexMapping.
+ *
+ * @param expr The expression to be adjusted.
+ * @param fieldsOldToNewIndexMapping A map containing the mapping the old field indices to new
+ * field indices.
+ * @return Return new expression with new field indices.
+ */
+ private[flink] def adjustInputRef(
+ expr: RexNode,
+ fieldsOldToNewIndexMapping: Map[Int, Int]): RexNode = expr.accept(
+ new RexShuttle() {
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ require(fieldsOldToNewIndexMapping.containsKey(inputRef.getIndex))
+ val newIndex = fieldsOldToNewIndexMapping(inputRef.getIndex)
+ new RexInputRef(newIndex, inputRef.getType)
+ }
+ })
+
private class EquivalentExprShuttle(rexBuilder: RexBuilder) extends RexShuttle {
private val equiExprMap = mutable.HashMap[String, RexNode]()
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RankUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RankUtil.scala
index 33e2c6d..ebb4c8b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RankUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RankUtil.scala
@@ -294,4 +294,14 @@ object RankUtil {
literals.head
}
+ def getRankNumberColumnIndex(rank: Rank): Option[Int] = {
+ if (rank.outputRankNumber) {
+ require(rank.getRowType.getFieldCount == rank.getInput.getRowType.getFieldCount + 1)
+ Some(rank.getRowType.getFieldCount - 1)
+ } else {
+ require(rank.getRowType.getFieldCount == rank.getInput.getRowType.getFieldCount)
+ None
+ }
+ }
+
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRuleTest.xml
new file mode 100644
index 0000000..33b47e8
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRuleTest.xml
@@ -0,0 +1,277 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testNotTranspose">
+ <Resource name="sql">
+ <![CDATA[
+SELECT category, max_price, rank_num FROM (
+ SELECT *,
+ ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as rank_num
+ FROM (
+ SELECT category, shopId, max(price) as max_price
+ FROM T
+ GROUP BY category, shopId
+ ))
+WHERE rank_num <= 3
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(category=[$0], max_price=[$2], rank_num=[$3])
++- LogicalFilter(condition=[<=($3, 3)])
+ +- LogicalProject(category=[$0], shopId=[$1], max_price=[$2], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalAggregate(group=[{0, 1}], max_price=[MAX($2)])
+ +- LogicalTableScan(table=[[T, source: [TestTableSource(category, shopId, price)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[category, max_price, w0$o0])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[category], orderBy=[max_price ASC], select=[category, shopId, max_price, w0$o0])
+ +- FlinkLogicalAggregate(group=[{0, 1}], max_price=[MAX($2)])
+ +- FlinkLogicalTableSourceScan(table=[[T, source: [TestTableSource(category, shopId, price)]]], fields=[category, shopId, price])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testProjectRankNumber">
+ <Resource name="sql">
+ <![CDATA[
+SELECT rank_num, rowtime, a, rank_num, a, rank_num FROM (
+ SELECT *,
+ ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ FROM MyTable)
+WHERE rank_num <= 2
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(rank_num=[$4], rowtime=[$3], a=[$0], rank_num0=[$4], a0=[$0], rank_num1=[$4])
++- LogicalFilter(condition=[<=($4, 2)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalTableScan(table=[[MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[rank_num1 AS rank_num, rowtime, a, rank_num1 AS rank_num0, a AS a0, rank_num1])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime, rank_num1])
+ +- FlinkLogicalCalc(select=[a, rowtime])
+ +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPruneOrderKeys">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a FROM (
+ SELECT *,
+ ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ FROM MyTable)
+WHERE rank_num = 1
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[=($4, 1)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalTableScan(table=[[MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[a])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime])
+ +- FlinkLogicalCalc(select=[a, rowtime])
+ +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPrunePartitionKeys">
+ <Resource name="sql">
+ <![CDATA[
+SELECT rowtime FROM (
+ SELECT *,
+ ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ FROM MyTable)
+WHERE rank_num = 1
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(rowtime=[$3])
++- LogicalFilter(condition=[=($4, 1)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalTableScan(table=[[MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[rowtime])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime])
+ +- FlinkLogicalCalc(select=[a, rowtime])
+ +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPushCalcWithConditionIntoRank">
+ <Resource name="sql">
+ <![CDATA[
+SELECT rowtime, c FROM (
+ SELECT *,
+ ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ FROM MyTable)
+WHERE rank_num <= 2 AND a > 10
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(rowtime=[$3], c=[$2])
++- LogicalFilter(condition=[AND(<=($4, 2), >($0, 10))])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalTableScan(table=[[MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[rowtime, c], where=[>(a, 10)])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, c, rowtime])
+ +- FlinkLogicalCalc(select=[a, c, rowtime])
+ +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPruneRankNumber">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, rowtime FROM (
+ SELECT *,
+ ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ FROM MyTable)
+WHERE rank_num <= 2
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], rowtime=[$3])
++- LogicalFilter(condition=[<=($4, 2)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalTableScan(table=[[MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime])
++- FlinkLogicalCalc(select=[a, rowtime])
+ +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPruneUniqueKeys">
+ <Resource name="sql">
+ <![CDATA[
+SELECT category, max_price, rank_num FROM (
+ SELECT *,
+ ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as rank_num
+ FROM (
+ SELECT category, shopId, max(price) as max_price, min(price) as min_price
+ FROM T
+ GROUP BY category, shopId
+ ))
+WHERE rank_num <= 3
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(category=[$0], max_price=[$2], rank_num=[$4])
++- LogicalFilter(condition=[<=($4, 3)])
+ +- LogicalProject(category=[$0], shopId=[$1], max_price=[$2], min_price=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalAggregate(group=[{0, 1}], max_price=[MAX($2)], min_price=[MIN($2)])
+ +- LogicalTableScan(table=[[T, source: [TestTableSource(category, shopId, price)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[category, max_price, w0$o0])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[category], orderBy=[max_price ASC], select=[category, shopId, max_price, w0$o0])
+ +- FlinkLogicalCalc(select=[category, shopId, max_price])
+ +- FlinkLogicalAggregate(group=[{0, 1}], max_price=[MAX($2)], min_price=[MIN($2)])
+ +- FlinkLogicalTableSourceScan(table=[[T, source: [TestTableSource(category, shopId, price)]]], fields=[category, shopId, price])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPruneUnusedProject">
+ <Resource name="sql">
+ <![CDATA[
+SELECT category, shopId, max_price, rank_num
+FROM (
+ SELECT category, shopId, max_price,
+ ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as rank_num
+ FROM (
+ SELECT category, shopId, max(price) as max_price, min(price) as min_price
+ FROM T
+ GROUP BY category, shopId
+ ))
+WHERE rank_num <= 3
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(category=[$0], shopId=[$1], max_price=[$2], rank_num=[$3])
++- LogicalFilter(condition=[<=($3, 3)])
+ +- LogicalProject(category=[$0], shopId=[$1], max_price=[$2], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalAggregate(group=[{0, 1}], max_price=[MAX($2)], min_price=[MIN($2)])
+ +- LogicalTableScan(table=[[T, source: [TestTableSource(category, shopId, price)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[category], orderBy=[max_price ASC], select=[category, shopId, max_price, w0$o0])
++- FlinkLogicalCalc(select=[category, shopId, max_price])
+ +- FlinkLogicalAggregate(group=[{0, 1}], max_price=[MAX($2)], min_price=[MIN($2)])
+ +- FlinkLogicalTableSourceScan(table=[[T, source: [TestTableSource(category, shopId, price)]]], fields=[category, shopId, price])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTrivialCalcIsRemoved">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, rowtime, rank_num FROM (
+ SELECT *,
+ ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ FROM MyTable)
+WHERE rank_num <= 2
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], rowtime=[$3], rank_num=[$4])
++- LogicalFilter(condition=[<=($4, 2)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalTableScan(table=[[MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime, w0$o0])
++- FlinkLogicalCalc(select=[a, rowtime])
+ +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml
index 8324972..70bf902 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml
@@ -89,8 +89,9 @@ LogicalProject(a=[$0], b=[$1], rk1=[$2], rk2=[$3])
<Resource name="planAfter">
<![CDATA[
FlinkLogicalCalc(select=[a, b, rk2 AS rk1, rk2])
-+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[b], orderBy=[a ASC], select=[a, b, c, rk2])
- +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[b], orderBy=[a ASC], select=[a, b, rk2])
+ +- FlinkLogicalCalc(select=[a, b])
+ +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -161,8 +162,9 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
<Resource name="planAfter">
<![CDATA[
FlinkLogicalCalc(select=[a, b, w0$o0], where=[>(a, 10)])
-+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC], select=[a, b, c, w0$o0])
- +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC], select=[a, b, w0$o0])
+ +- FlinkLogicalCalc(select=[a, b])
+ +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -184,8 +186,8 @@ LogicalProject(a=[$0], b=[$1], rn=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0])
-+- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC], select=[a, b, c, w0$o0])
+FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC], select=[a, b, w0$o0])
++- FlinkLogicalCalc(select=[a, b])
+- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -304,8 +306,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0])
-+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[], orderBy=[a ASC], select=[a, b, c, w0$o0])
+FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[], orderBy=[a ASC], select=[a, b, w0$o0])
++- FlinkLogicalCalc(select=[a, b])
+- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml
new file mode 100644
index 0000000..754e016
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testCannotRemoveRankNumberColumn1">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, rank_num FROM (
+ SELECT *,
+ RANK() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ FROM MyTable)
+WHERE rank_num >= 1 AND rank_num < 2
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], rank_num=[$4])
++- LogicalFilter(condition=[AND(>=($4, 1), <($4, 2))])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[RANK() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalTableScan(table=[[MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, w0$o0])
++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime, w0$o0])
+ +- FlinkLogicalCalc(select=[a, rowtime])
+ +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCannotRemoveRankNumberColumn2">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, rank_num FROM (
+ SELECT *,
+ ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ FROM MyTable)
+WHERE rank_num >= 1 AND rank_num < 3
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], rank_num=[$4])
++- LogicalFilter(condition=[AND(>=($4, 1), <($4, 3))])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalTableScan(table=[[MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, w0$o0])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime, w0$o0])
+ +- FlinkLogicalCalc(select=[a, rowtime])
+ +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCannotRemoveRankNumberColumn3">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a FROM (
+ SELECT *,
+ ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ FROM MyTable)
+WHERE rank_num >= 1 AND rank_num < 2
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[AND(>=($4, 1), <($4, 2))])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalTableScan(table=[[MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[a])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime])
+ +- FlinkLogicalCalc(select=[a, rowtime])
+ +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCouldRemoveRankNumberColumn">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, rank_num FROM (
+ SELECT *,
+ ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ FROM MyTable)
+WHERE rank_num >= 1 AND rank_num < 2
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], rank_num=[$4])
++- LogicalFilter(condition=[AND(>=($4, 1), <($4, 2))])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+ +- LogicalTableScan(table=[[MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, 1:BIGINT AS w0$o0])
++- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime])
+ +- FlinkLogicalCalc(select=[a, rowtime])
+ +- FlinkLogicalDataStreamTableScan(table=[[_DataStreamTable_0]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RankTest.xml
index c73e694..215339f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RankTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RankTest.xml
@@ -89,10 +89,11 @@ LogicalProject(a=[$0], b=[$1], c=[$2], row_num=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c, w0$o0])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, proctime, rowtime, w0$o0])
+Calc(select=[a, b, c, 1:BIGINT AS w0$o0])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[b DESC], select=[a, b, c])
+- Exchange(distribution=[hash[a]])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+ +- Calc(select=[a, b, c])
+ +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
@@ -115,9 +116,10 @@ LogicalProject(a=[$0], rk=[$1], b=[$2], c=[$3])
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, rk, b, c])
-+- Rank(strategy=[AppendFastStrategy], rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[a], orderBy=[a ASC], select=[a, b, c, proctime, rowtime, rk])
++- Rank(strategy=[AppendFastStrategy], rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[a], orderBy=[a ASC], select=[a, b, c, rk])
+- Exchange(distribution=[hash[a]])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+ +- Calc(select=[a, b, c])
+ +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
@@ -143,9 +145,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, c])
-+- Rank(strategy=[AppendFastStrategy], rankType=[RANK], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[proctime ASC], select=[a, b, c, proctime, rowtime])
++- Rank(strategy=[AppendFastStrategy], rankType=[RANK], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[proctime ASC], select=[a, b, c, proctime])
+- Exchange(distribution=[hash[a]])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+ +- Calc(select=[a, b, c, proctime])
+ +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
@@ -171,9 +174,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, c])
-+- Rank(strategy=[AppendFastStrategy], rankType=[RANK], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[proctime DESC], select=[a, b, c, proctime, rowtime])
++- Rank(strategy=[AppendFastStrategy], rankType=[RANK], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[proctime DESC], select=[a, b, c, proctime])
+- Exchange(distribution=[hash[a]])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+ +- Calc(select=[a, b, c, proctime])
+ +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
@@ -201,7 +205,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
Calc(select=[a, b, c])
+- Deduplicate(keepLastRow=[false], key=[a], order=[PROCTIME])
+- Exchange(distribution=[hash[a]])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+ +- Calc(select=[a, b, c, proctime])
+ +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
@@ -229,7 +234,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
Calc(select=[a, b, c])
+- Deduplicate(keepLastRow=[true], key=[a], order=[PROCTIME])
+- Exchange(distribution=[hash[a]])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+ +- Calc(select=[a, b, c, proctime])
+ +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
@@ -255,9 +261,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, c])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime ASC], select=[a, b, c, proctime, rowtime])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime ASC], select=[a, b, c, rowtime])
+- Exchange(distribution=[hash[a]])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+ +- Calc(select=[a, b, c, rowtime])
+ +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
@@ -283,9 +290,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, c])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, b, c, proctime, rowtime])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, b, c, rowtime])
+- Exchange(distribution=[hash[a]])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+ +- Calc(select=[a, b, c, rowtime])
+ +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
@@ -310,9 +318,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], row_num=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, proctime, rowtime, w0$o0], updateAsRetraction=[false], accMode=[Acc])
- +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
++- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+ +- Calc(select=[a, b, c], updateAsRetraction=[false], accMode=[Acc])
+- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
]]>
</Resource>
@@ -338,9 +346,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], row_num=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, proctime, rowtime, w0$o0], updateAsRetraction=[false], accMode=[Acc])
- +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
++- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+ +- Calc(select=[a, b, c], updateAsRetraction=[false], accMode=[Acc])
+- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
]]>
</Resource>
@@ -761,9 +769,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2], row_num=[$3])
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, c, 10:BIGINT AS row_num], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=10, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=10, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
+ +- Calc(select=[a, b, c], updateAsRetraction=[false], accMode=[Acc])
+ +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -867,9 +876,9 @@ LogicalProject(row_num=[$3], a=[$0], c=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[row_num, a, c], where=[IS NOT NULL(b)], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, proctime, rowtime, row_num], updateAsRetraction=[false], accMode=[Acc])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, row_num], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
- +- Calc(select=[a, b, c, proctime, rowtime], where=[>(c, 1000)], updateAsRetraction=[false], accMode=[Acc])
+ +- Calc(select=[a, b, c], where=[>(c, 1000)], updateAsRetraction=[false], accMode=[Acc])
+- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
]]>
</Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRuleTest.scala
new file mode 100644
index 0000000..8843e6b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/CalcRankTransposeRuleTest.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.FlinkStreamProgram
+import org.apache.flink.table.util.TableTestBase
+
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[CalcRankTransposeRule]].
+ */
+class CalcRankTransposeRuleTest extends TableTestBase {
+ private val util = streamTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ util.buildStreamProgram(FlinkStreamProgram.PHYSICAL)
+
+ util.addDataStream[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime)
+ util.addTableSource[(String, Int, String)]("T", 'category, 'shopId, 'price)
+ }
+
+ @Test
+ def testPruneOrderKeys(): Unit = {
+ // Push Calc into Rank, project column (a, rowtime), prune column (b, c)
+ val sql =
+ """
+ |SELECT a FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ | FROM MyTable)
+ |WHERE rank_num = 1
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testPrunePartitionKeys(): Unit = {
+ // Push Calc into Rank, project column (a, rowtime), prune column (b, c)
+ val sql =
+ """
+ |SELECT rowtime FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ | FROM MyTable)
+ |WHERE rank_num = 1
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testPruneUniqueKeys(): Unit = {
+ // Push Calc into Rank, project column (category, shopId, max_price), prune column (min_price)
+ val sql =
+ """
+ |SELECT category, max_price, rank_num FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as rank_num
+ | FROM (
+ | SELECT category, shopId, max(price) as max_price, min(price) as min_price
+ | FROM T
+ | GROUP BY category, shopId
+ | ))
+ |WHERE rank_num <= 3
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testNotTranspose(): Unit = {
+ // Not transpose calc into Rank because there is no columns to prune
+ val sql =
+ """
+ |SELECT category, max_price, rank_num FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as rank_num
+ | FROM (
+ | SELECT category, shopId, max(price) as max_price
+ | FROM T
+ | GROUP BY category, shopId
+ | ))
+ |WHERE rank_num <= 3
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testPruneRankNumber(): Unit = {
+ // Push Calc into Rank, project column (a, rowtime), prune column (b, c)
+ val sql =
+ """
+ |SELECT a, rowtime FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ | FROM MyTable)
+ |WHERE rank_num <= 2
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testProjectRankNumber(): Unit = {
+ // Push Calc into Rank, project column (a, rowtime), prune column (b, c)
+ // Need a New Calc on top of Rank to keep equivalency
+ val sql =
+ """
+ |SELECT rank_num, rowtime, a, rank_num, a, rank_num FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ | FROM MyTable)
+ |WHERE rank_num <= 2
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testTrivialCalcIsRemoved(): Unit = {
+ // Push Calc into Rank, project column (a, rowtime), prune column (b, c)
+ // Does not need a New Calc on top of Rank because it is trivial
+ val sql =
+ """
+ |SELECT a, rowtime, rank_num FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ | FROM MyTable)
+ |WHERE rank_num <= 2
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testPushCalcWithConditionIntoRank(): Unit = {
+ // Push Calc into Rank even if it has filter condition, project column(rowtime, c, a), prune(b)
+ val sql =
+ """
+ |SELECT rowtime, c FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ | FROM MyTable)
+ |WHERE rank_num <= 2 AND a > 10
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testPruneUnusedProject(): Unit = {
+ // Push Calc into Rank, project(category, shopId, max_price), prune (min_price)
+ val sql =
+ """
+ |SELECT category, shopId, max_price, rank_num
+ |FROM (
+ | SELECT category, shopId, max_price,
+ | ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as rank_num
+ | FROM (
+ | SELECT category, shopId, max(price) as max_price, min(price) as min_price
+ | FROM T
+ | GROUP BY category, shopId
+ | ))
+ |WHERE rank_num <= 3
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRuleTest.scala
new file mode 100644
index 0000000..5d48714
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RankNumberColumnRemoveRuleTest.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.FlinkStreamProgram
+import org.apache.flink.table.util.TableTestBase
+
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[RankNumberColumnRemoveRule]].
+ */
+class RankNumberColumnRemoveRuleTest extends TableTestBase {
+ private val util = streamTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ util.buildStreamProgram(FlinkStreamProgram.PHYSICAL)
+
+ util.addDataStream[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime)
+ }
+
+ @Test
+ def testCannotRemoveRankNumberColumn1(): Unit = {
+ val sql =
+ """
+ |SELECT a, rank_num FROM (
+ | SELECT *,
+ | RANK() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ | FROM MyTable)
+ |WHERE rank_num >= 1 AND rank_num < 2
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testCannotRemoveRankNumberColumn2(): Unit = {
+ val sql =
+ """
+ |SELECT a, rank_num FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ | FROM MyTable)
+ |WHERE rank_num >= 1 AND rank_num < 3
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testCannotRemoveRankNumberColumn3(): Unit = {
+ // the Rank does not output rank number, so this rule will not be matched
+ val sql =
+ """
+ |SELECT a FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ | FROM MyTable)
+ |WHERE rank_num >= 1 AND rank_num < 2
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testCouldRemoveRankNumberColumn(): Unit = {
+ val sql =
+ """
+ |SELECT a, rank_num FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+ | FROM MyTable)
+ |WHERE rank_num >= 1 AND rank_num < 2
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+}