You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/31 02:15:25 UTC
[flink] 11/13: [FLINK-14338][table-planner][table-planner-blink]
Update files due to CALCITE-3763
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8a6877d14eb8f15c9f5124e400f2f3e351f42c13
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Tue Mar 17 21:31:45 2020 +0800
[FLINK-14338][table-planner][table-planner-blink] Update files due to CALCITE-3763
* CALCITE-3763 prunes useless fields of input project
---
.../table/planner/calcite/FlinkRelBuilder.scala | 20 +++-
.../WindowAggregateReduceFunctionsRule.scala | 15 ++-
.../physical/batch/BatchExecHashAggRule.scala | 13 ++-
.../physical/batch/BatchExecSortAggRule.scala | 17 ++-
.../batch/BatchExecWindowAggregateRule.scala | 18 ++-
.../batch/RemoveRedundantLocalHashAggRule.scala | 4 +-
.../batch/RemoveRedundantLocalSortAggRule.scala | 4 +-
.../table/planner/plan/batch/table/CalcTest.xml | 12 +-
.../FlinkAggregateJoinTransposeRuleTest.xml | 8 +-
.../logical/SimplifyJoinConditionRuleTest.xml | 2 +-
.../batch/RemoveRedundantLocalHashAggRuleTest.xml | 26 +++++
.../batch/RemoveRedundantLocalRankRuleTest.xml | 6 +-
.../batch/RemoveRedundantLocalSortAggRuleTest.xml | 30 +++++
.../planner/plan/stream/table/AggregateTest.xml | 6 +-
.../plan/stream/table/TableAggregateTest.xml | 6 +-
.../plan/stream/table/TwoStageAggregateTest.xml | 6 +-
.../RemoveRedundantLocalHashAggRuleTest.scala | 13 +++
.../RemoveRedundantLocalSortAggRuleTest.scala | 11 ++
.../ExtendedAggregateExtractProjectRule.java | 8 +-
.../flink/table/calcite/FlinkRelBuilder.scala | 21 +++-
.../rules/common/LogicalWindowAggregateRule.scala | 126 ++++++++++++++++++++-
.../WindowAggregateReduceFunctionsRule.scala | 14 ++-
22 files changed, 337 insertions(+), 49 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala
index 47f1ab8..3b8eb0e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalTableAggregate,
import org.apache.flink.table.planner.plan.utils.AggregateUtil
import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType}
import org.apache.flink.table.sinks.TableSink
+
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelCollation
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
@@ -40,6 +41,7 @@ import org.apache.calcite.util.{ImmutableBitSet, Util}
import java.lang.Iterable
import java.util
import java.util.List
+import java.util.function.UnaryOperator
import scala.collection.JavaConversions._
@@ -132,7 +134,23 @@ class FlinkRelBuilder(
namedProperties: List[PlannerNamedWindowProperty],
aggCalls: Iterable[AggCall]): RelBuilder = {
// build logical aggregate
- val aggregate = super.aggregate(groupKey, aggCalls).build().asInstanceOf[LogicalAggregate]
+
+ // Because of:
+ // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input,
+ // if the input is a Project.
+ //
+ // the field can not be pruned if it is referenced by other expressions
+ // of the window aggregation(i.e. the TUMBLE_START/END).
+ // To solve this, we config the RelBuilder to forbidden this feature.
+ val aggregate = transform(
+ new UnaryOperator[RelBuilder.Config] {
+ override def apply(t: RelBuilder.Config)
+ : RelBuilder.Config = t.withPruneInputOfAggregate(false)
+ })
+ .push(build())
+ .aggregate(groupKey, aggCalls)
+ .build()
+ .asInstanceOf[LogicalAggregate]
// build logical window aggregate from it
aggregate match {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.scala
index 90dbdc3..f6d7f9e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.rules.logical
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate
+import org.apache.calcite.plan.Contexts
import org.apache.calcite.plan.RelOptRule._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.{Aggregate, AggregateCall, RelFactories}
@@ -39,7 +40,11 @@ import scala.collection.JavaConversions._
class WindowAggregateReduceFunctionsRule
extends AggregateReduceFunctionsRule(
operand(classOf[LogicalWindowAggregate], any()),
- RelFactories.LOGICAL_BUILDER) {
+ RelBuilder.proto(
+ Contexts.of(
+ RelFactories.DEFAULT_STRUCT,
+ RelBuilder.Config.DEFAULT
+ .withPruneInputOfAggregate(false)))) {
override def newAggregateRel(
relBuilder: RelBuilder,
@@ -47,6 +52,14 @@ class WindowAggregateReduceFunctionsRule
newCalls: util.List[AggregateCall]): Unit = {
// create a LogicalAggregate with simpler aggregation functions
+
+ // Because of:
+ // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input,
+ // if the input is a Project.
+ //
+ // the field can not be pruned if it is referenced by other expressions
+ // of the window aggregation(i.e. the TUMBLE_START/END).
+ // To solve this, we config the RelBuilder to forbidden this feature.
super.newAggregateRel(relBuilder, oldAgg, newCalls)
// pop LogicalAggregate from RelBuilder
val newAgg = relBuilder.build().asInstanceOf[LogicalAggregate]
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecHashAggRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecHashAggRule.scala
index 704d267..cc615f0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecHashAggRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecHashAggRule.scala
@@ -118,6 +118,17 @@ class BatchExecHashAggRule
} else {
Seq(FlinkRelDistribution.SINGLETON)
}
+ // Remove the global agg call filters because the
+ // filter is already done by local aggregation.
+ val aggCallsWithoutFilter = aggCallsWithoutAuxGroupCalls.map {
+ aggCall =>
+ if (aggCall.filterArg > 0) {
+ aggCall.copy(aggCall.getArgList, -1, aggCall.getCollation)
+ } else {
+ aggCall
+ }
+ }
+ val globalAggCallToAggFunction = aggCallsWithoutFilter.zip(aggFunctions)
globalDistributions.foreach { globalDistribution =>
val requiredTraitSet = localHashAgg.getTraitSet.replace(globalDistribution)
val newLocalHashAgg = RelOptRule.convert(localHashAgg, requiredTraitSet)
@@ -131,7 +142,7 @@ class BatchExecHashAggRule
inputRowType,
globalGroupSet,
globalAuxGroupSet,
- aggCallToAggFunction,
+ globalAggCallToAggFunction,
isMerge = true)
call.transformTo(globalHashAgg)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSortAggRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSortAggRule.scala
index 426e9df..bf18c59 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSortAggRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSortAggRule.scala
@@ -74,10 +74,6 @@ class BatchExecSortAggRule
val input: RelNode = call.rel(1)
val inputRowType = input.getRowType
- if (agg.indicator) {
- throw new UnsupportedOperationException("Not support group sets aggregate now.")
- }
-
val (auxGroupSet, aggCallsWithoutAuxGroupCalls) = AggregateUtil.checkAndSplitAggCalls(agg)
val (_, aggBufferTypes, aggFunctions) = AggregateUtil.transformToBatchAggregateFunctions(
@@ -124,6 +120,17 @@ class BatchExecSortAggRule
} else {
(Seq(FlinkRelDistribution.SINGLETON), RelCollations.EMPTY)
}
+ // Remove the global agg call filters because the
+ // filter is already done by local aggregation.
+ val aggCallsWithoutFilter = aggCallsWithoutAuxGroupCalls.map {
+ aggCall =>
+ if (aggCall.filterArg > 0) {
+ aggCall.copy(aggCall.getArgList, -1, aggCall.getCollation)
+ } else {
+ aggCall
+ }
+ }
+ val globalAggCallToAggFunction = aggCallsWithoutFilter.zip(aggFunctions)
globalDistributions.foreach { globalDistribution =>
val requiredTraitSet = localSortAgg.getTraitSet
.replace(globalDistribution)
@@ -140,7 +147,7 @@ class BatchExecSortAggRule
newLocalInput.getRowType,
globalGroupSet,
globalAuxGroupSet,
- aggCallToAggFunction,
+ globalAggCallToAggFunction,
isMerge = true)
call.transformTo(globalSortAgg)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
index 7920a1a..bf430cb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
@@ -34,12 +34,13 @@ import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDat
import org.apache.flink.table.types.logical.{BigIntType, IntType, LogicalType}
import org.apache.calcite.plan.RelOptRule._
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.{Contexts, RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Aggregate.Group
-import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall, RelFactories}
import org.apache.calcite.rel.{RelCollations, RelNode}
import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.tools.RelBuilder
import org.apache.commons.math3.util.ArithmeticUtils
import scala.collection.JavaConversions._
@@ -70,6 +71,11 @@ class BatchExecWindowAggregateRule
extends RelOptRule(
operand(classOf[FlinkLogicalWindowAggregate],
operand(classOf[RelNode], any)),
+ RelBuilder.proto(
+ Contexts.of(
+ RelFactories.DEFAULT_STRUCT,
+ RelBuilder.Config.DEFAULT
+ .withPruneInputOfAggregate(false))),
"BatchExecWindowAggregateRule")
with BatchExecAggRuleBase {
@@ -156,6 +162,14 @@ class BatchExecWindowAggregateRule
// TODO aggregate include projection now, so do not provide new trait will be safe
val aggProvidedTraitSet = input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
+
+ // Because of:
+ // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input,
+ // if the input is a Project.
+ //
+ // the field can not be pruned if it is referenced by other expressions
+ // of the window aggregation(i.e. the TUMBLE_START/END).
+ // To solve this, we config the RelBuilder to forbidden this feature.
val inputTimeFieldIndex = AggregateUtil.timeFieldIndex(
input.getRowType, call.builder(), window.timeAttribute)
val inputTimeFieldType = agg.getInput.getRowType.getFieldList.get(inputTimeFieldIndex).getType
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala
index c538dac..33a2bb2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala
@@ -49,7 +49,9 @@ class RemoveRedundantLocalHashAggRule extends RelOptRule(
inputOfLocalAgg.getRowType,
localAgg.getGrouping,
localAgg.getAuxGrouping,
- globalAgg.getAggCallToAggFunction,
+ // Use the localAgg agg calls because the global agg call filters was removed,
+ // see BatchExecHashAggRule for details.
+ localAgg.getAggCallToAggFunction,
isMerge = false)
call.transformTo(newGlobalAgg)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala
index 615d082f..a0ff75e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala
@@ -48,7 +48,9 @@ abstract class RemoveRedundantLocalSortAggRule(
inputOfLocalAgg.getRowType,
localAgg.getGrouping,
localAgg.getAuxGrouping,
- globalAgg.getAggCallToAggFunction,
+ // Use the localAgg agg calls because the global agg call filters was removed,
+ // see BatchExecSortAggRule for details.
+ localAgg.getAggCallToAggFunction,
isMerge = false)
call.transformTo(newGlobalAgg)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CalcTest.xml
index d44892b..4c11764 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CalcTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CalcTest.xml
@@ -168,8 +168,8 @@ Calc(select=[a])
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$1])
-+- LogicalAggregate(group=[{4}], EXPR$0=[SUM($0)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], k=[org$apache$flink$table$planner$plan$batch$table$CalcTest$MyHashCode$$1945176195778b1bff1a30c41ce16445($2)])
++- LogicalAggregate(group=[{1}], EXPR$0=[SUM($0)])
+ +- LogicalProject(a=[$0], k=[org$apache$flink$table$planner$plan$batch$table$CalcTest$MyHashCode$$1945176195778b1bff1a30c41ce16445($2)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
]]>
</Resource>
@@ -179,7 +179,7 @@ Calc(select=[EXPR$0])
+- HashAggregate(isMerge=[true], groupBy=[k], select=[k, Final_SUM(sum$0) AS EXPR$0])
+- Exchange(distribution=[hash[k]])
+- LocalHashAggregate(groupBy=[k], select=[k, Partial_SUM(a) AS sum$0])
- +- Calc(select=[a, b, c, d, MyHashCode$(c) AS k])
+ +- Calc(select=[a, MyHashCode$(c) AS k])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
</Resource>
@@ -188,8 +188,8 @@ Calc(select=[EXPR$0])
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[$1])
-+- LogicalAggregate(group=[{4}], EXPR$0=[SUM($0)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], k=[UPPER($2)])
++- LogicalAggregate(group=[{1}], EXPR$0=[SUM($0)])
+ +- LogicalProject(a=[$0], k=[UPPER($2)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
]]>
</Resource>
@@ -199,7 +199,7 @@ Calc(select=[EXPR$0])
+- HashAggregate(isMerge=[true], groupBy=[k], select=[k, Final_SUM(sum$0) AS EXPR$0])
+- Exchange(distribution=[hash[k]])
+- LocalHashAggregate(groupBy=[k], select=[k, Partial_SUM(a) AS sum$0])
- +- Calc(select=[a, b, c, d, UPPER(c) AS k])
+ +- Calc(select=[a, UPPER(c) AS k])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
</Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateJoinTransposeRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateJoinTransposeRuleTest.xml
index 0d58e8b..35d417a 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateJoinTransposeRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateJoinTransposeRuleTest.xml
@@ -216,8 +216,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT($0)])
</Resource>
<Resource name="planAfter">
<![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[$SUM0($4)])
-+- LogicalProject(a=[$0], $f1=[$1], a0=[$2], $f10=[$3], $f4=[*($1, $3)])
+LogicalAggregate(group=[{}], EXPR$0=[$SUM0($0)])
++- LogicalProject($f4=[*($1, $3)])
+- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
:- LogicalProject(a=[$0], $f1=[CASE(IS NOT NULL($0), 1:BIGINT, 0:BIGINT)])
: +- LogicalAggregate(group=[{0}])
@@ -244,8 +244,8 @@ LogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
</Resource>
<Resource name="planAfter">
<![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[SUM($3)])
-+- LogicalProject(a=[$0], a0=[$1], $f1=[$2], $f3=[CAST(*($0, $2)):INTEGER])
+LogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
++- LogicalProject($f3=[CAST(*($0, $2)):INTEGER])
+- LogicalJoin(condition=[=($0, $1)], joinType=[inner])
:- LogicalAggregate(group=[{0}])
: +- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c)]]])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
index 6c65f74..182519d 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
@@ -42,7 +42,7 @@ LogicalProject(a=[$0])
+- LogicalJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
+- LogicalAggregate(group=[{0, 1}], EXPR$0=[COUNT()])
- +- LogicalProject(a=[$3], b=[$4], $f0=[0])
+ +- LogicalProject(a=[$3], b=[$4])
+- LogicalJoin(condition=[AND(=($0, $3), OR(<($0, 2), =($4, 5)))], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(d, e, f)]]])
+- LogicalAggregate(group=[{0, 1}])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.xml
index 8c16336..93897ab 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.xml
@@ -81,4 +81,30 @@ HashAggregate(isMerge=[false], groupBy=[a], select=[a, SUM(b) AS EXPR$1])
]]>
</Resource>
</TestCase>
+ <TestCase name="testUsingLocalAggCallFilters">
+ <Resource name="sql">
+ <![CDATA[SELECT d, MAX(e), MAX(e) FILTER (WHERE a < 10), COUNT(DISTINCT c),
+COUNT(DISTINCT c) FILTER (WHERE a > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3)
+FROM z GROUP BY d]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[MAX($1) FILTER $2], EXPR$3=[COUNT(DISTINCT $3)], EXPR$4=[COUNT(DISTINCT $3) FILTER $4], EXPR$5=[COUNT(DISTINCT $5) FILTER $6])
++- LogicalProject(d=[$3], e=[$4], $f2=[IS TRUE(<($0, 10))], c=[$2], $f4=[IS TRUE(>($0, 5))], b=[$1], $f6=[IS TRUE(>($1, 3))])
+ +- LogicalTableScan(table=[[default_catalog, default_database, z, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+HashAggregate(isMerge=[false], groupBy=[d], select=[d, MIN(EXPR$1) FILTER $g_15 AS EXPR$1, MIN(EXPR$2) FILTER $g_15 AS EXPR$2, COUNT(c) FILTER $g_7 AS EXPR$3, COUNT(c) FILTER $g_3 AS EXPR$4, COUNT(b) FILTER $g_12 AS EXPR$5])
++- Calc(select=[d, c, b, EXPR$1, EXPR$2, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 3), $f4) AS $g_3, =(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 7) AS $g_7, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 12), $f6) AS $g_12, =(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGIN [...]
+ +- HashAggregate(isMerge=[true], groupBy=[d, c, $f4, b, $f6, $e], select=[d, c, $f4, b, $f6, $e, Final_MAX(max$0) AS EXPR$1, Final_MAX(max$1) AS EXPR$2])
+ +- Exchange(distribution=[hash[d]])
+ +- LocalHashAggregate(groupBy=[d, c, $f4, b, $f6, $e], select=[d, c, $f4, b, $f6, $e, Partial_MAX(e) AS max$0, Partial_MAX(e) FILTER $f2 AS max$1])
+ +- Expand(projects=[d, e, $f2, c, $f4, b, $f6, $e], projects=[{d, e, $f2, c, $f4, null AS b, null AS $f6, 3 AS $e}, {d, e, $f2, c, null AS $f4, null AS b, null AS $f6, 7 AS $e}, {d, e, $f2, null AS c, null AS $f4, b, $f6, 12 AS $e}, {d, e, $f2, null AS c, null AS $f4, null AS b, null AS $f6, 15 AS $e}])
+ +- Calc(select=[d, e, IS TRUE(<(a, 10)) AS $f2, c, IS TRUE(>(a, 5)) AS $f4, b, IS TRUE(>(b, 3)) AS $f6])
+ +- TableSourceScan(table=[[default_catalog, default_database, z, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRuleTest.xml
index 4e7b5c5..df38440 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRuleTest.xml
@@ -100,9 +100,9 @@ LogicalProject(a=[$0], b=[$1], rk=[$2], rk1=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=5], partitionBy=[a], orderBy=[b ASC], global=[true], select=[a, b, w0$o0, w0$o0])
-+- Calc(select=[a, b, w0$o0])
- +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=5], partitionBy=[a], orderBy=[b ASC], global=[true], select=[a, b, c, w0$o0])
+Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=5], partitionBy=[a], orderBy=[b ASC], global=[true], select=[a, b, $2, w0$o0])
++- Calc(select=[a, b, $2])
+ +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=5], partitionBy=[a], orderBy=[b ASC], global=[true], select=[a, b, c, $2])
+- Sort(orderBy=[a ASC, b ASC])
+- Exchange(distribution=[hash[a]])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=5], partitionBy=[a], orderBy=[b ASC], global=[false], select=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.xml
index 9a5c7f7..dc55eb4 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.xml
@@ -83,4 +83,34 @@ Calc(select=[EXPR$0])
]]>
</Resource>
</TestCase>
+ <TestCase name="testUsingLocalAggCallFilters">
+ <Resource name="sql">
+ <![CDATA[SELECT d, MAX(e), MAX(e) FILTER (WHERE a < 10), COUNT(DISTINCT c),
+COUNT(DISTINCT c) FILTER (WHERE a > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3)
+FROM z GROUP BY d]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[MAX($1) FILTER $2], EXPR$3=[COUNT(DISTINCT $3)], EXPR$4=[COUNT(DISTINCT $3) FILTER $4], EXPR$5=[COUNT(DISTINCT $5) FILTER $6])
++- LogicalProject(d=[$3], e=[$4], $f2=[IS TRUE(<($0, 10))], c=[$2], $f4=[IS TRUE(>($0, 5))], b=[$1], $f6=[IS TRUE(>($1, 3))])
+ +- LogicalTableScan(table=[[default_catalog, default_database, z, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+SortAggregate(isMerge=[true], groupBy=[d], select=[d, Final_MIN(min$0) AS EXPR$1, Final_MIN(min$1) AS EXPR$2, Final_COUNT(count$2) AS EXPR$3, Final_COUNT(count$3) AS EXPR$4, Final_COUNT(count$4) AS EXPR$5])
++- Sort(orderBy=[d ASC])
+ +- Exchange(distribution=[hash[d]])
+ +- LocalSortAggregate(groupBy=[d], select=[d, Partial_MIN(EXPR$1) FILTER $g_15 AS min$0, Partial_MIN(EXPR$2) FILTER $g_15 AS min$1, Partial_COUNT(c) FILTER $g_7 AS count$2, Partial_COUNT(c) FILTER $g_3 AS count$3, Partial_COUNT(b) FILTER $g_12 AS count$4])
+ +- Calc(select=[d, c, b, EXPR$1, EXPR$2, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 3), $f4) AS $g_3, =(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 7) AS $g_7, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 12), $f6) AS $g_12, =(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), [...]
+ +- Sort(orderBy=[d ASC])
+ +- SortAggregate(isMerge=[false], groupBy=[d, c, $f4, b, $f6, $e], select=[d, c, $f4, b, $f6, $e, MAX(e) AS EXPR$1, MAX(e) FILTER $f2 AS EXPR$2])
+ +- Sort(orderBy=[d ASC, c ASC, $f4 ASC, b ASC, $f6 ASC, $e ASC])
+ +- Exchange(distribution=[hash[d, c, $f4, b, $f6, $e]])
+ +- Expand(projects=[d, e, $f2, c, $f4, b, $f6, $e], projects=[{d, e, $f2, c, $f4, null AS b, null AS $f6, 3 AS $e}, {d, e, $f2, c, null AS $f4, null AS b, null AS $f6, 7 AS $e}, {d, e, $f2, null AS c, null AS $f4, b, $f6, 12 AS $e}, {d, e, $f2, null AS c, null AS $f4, null AS b, null AS $f6, 15 AS $e}])
+ +- Calc(select=[d, e, IS TRUE(<(a, 10)) AS $f2, c, IS TRUE(>(a, 5)) AS $f4, b, IS TRUE(>(b, 3)) AS $f6])
+ +- TableSourceScan(table=[[default_catalog, default_database, z, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
index b7c4a6e..fd6047b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
@@ -102,8 +102,8 @@ Calc(select=[EXPR$0])
<Resource name="planBefore">
<![CDATA[
LogicalProject(b=[$0], EXPR$0=[$1])
-+- LogicalAggregate(group=[{1}], EXPR$0=[AVG($3)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], a0=[CAST($0):DOUBLE])
++- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)])
+ +- LogicalProject(b=[$1], a0=[CAST($0):DOUBLE])
+- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
@@ -111,7 +111,7 @@ LogicalProject(b=[$0], EXPR$0=[$1])
<![CDATA[
GroupAggregate(groupBy=[b], select=[b, AVG(a0) AS EXPR$0])
+- Exchange(distribution=[hash[b]])
- +- Calc(select=[a, b, c, CAST(a) AS a0])
+ +- Calc(select=[b, CAST(a) AS a0])
+- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableAggregateTest.xml
index 5cbc8cf..1cd0641 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableAggregateTest.xml
@@ -53,8 +53,8 @@ Calc(select=[f0 AS a, f1 AS b])
<Resource name="planBefore">
<![CDATA[
LogicalProject(bb=[AS($0, _UTF-16LE'bb')], _c1=[+(AS($1, _UTF-16LE'x'), 1)], y=[AS($2, _UTF-16LE'y')])
-+- LogicalTableAggregate(group=[{5}], tableAggregate=[[EmptyTableAggFunc($0, $1)]])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], bb=[MOD($1, 5)])
++- LogicalTableAggregate(group=[{2}], tableAggregate=[[EmptyTableAggFunc($0, $1)]])
+ +- LogicalProject(a=[$0], b=[$1], bb=[MOD($1, 5)])
+- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c, d, e)]]])
]]>
</Resource>
@@ -63,7 +63,7 @@ LogicalProject(bb=[AS($0, _UTF-16LE'bb')], _c1=[+(AS($1, _UTF-16LE'x'), 1)], y=[
Calc(select=[bb, +(f0, 1) AS _c1, f1 AS y])
+- GroupTableAggregate(groupBy=[bb], select=[bb, EmptyTableAggFunc(a, b) AS (f0, f1)])
+- Exchange(distribution=[hash[bb]])
- +- Calc(select=[a, b, c, d, e, MOD(b, 5) AS bb])
+ +- Calc(select=[a, b, MOD(b, 5) AS bb])
+- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
]]>
</Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml
index be5a7e3..8b59588 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml
@@ -39,8 +39,8 @@ Calc(select=[EXPR$0])
<Resource name="planBefore">
<![CDATA[
LogicalProject(b=[$0], EXPR$0=[$1])
-+- LogicalAggregate(group=[{1}], EXPR$0=[AVG($3)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], a0=[CAST($0):DOUBLE])
++- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)])
+ +- LogicalProject(b=[$1], a0=[CAST($0):DOUBLE])
+- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
@@ -49,7 +49,7 @@ LogicalProject(b=[$0], EXPR$0=[$1])
GlobalGroupAggregate(groupBy=[b], select=[b, AVG((sum$0, count$1)) AS EXPR$0])
+- Exchange(distribution=[hash[b]])
+- LocalGroupAggregate(groupBy=[b], select=[b, AVG(a0) AS (sum$0, count$1)])
- +- Calc(select=[a, b, c, CAST(a) AS a0])
+ +- Calc(select=[b, CAST(a) AS a0])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.scala
index cad9754..d155b5c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.scala
@@ -35,6 +35,7 @@ class RemoveRedundantLocalHashAggRuleTest extends TableTestBase {
def setup(): Unit = {
util.addTableSource[(Int, Long, String)]("x", 'a, 'b, 'c)
util.addTableSource[(Int, Long, String)]("y", 'd, 'e, 'f)
+ util.addTableSource[(Int, Long, Long, Long, Long)]("z", 'a, 'b, 'c, 'd, 'e)
}
@Test
@@ -69,4 +70,16 @@ class RemoveRedundantLocalHashAggRuleTest extends TableTestBase {
util.verifyPlan(sqlQuery)
}
+ @Test
+ def testUsingLocalAggCallFilters(): Unit = {
+ util.tableEnv.getConfig.getConfiguration.setString(
+ ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
+ util.tableEnv.getConfig.getConfiguration.setBoolean(
+ BatchExecJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, true)
+ val sqlQuery = "SELECT d, MAX(e), MAX(e) FILTER (WHERE a < 10), COUNT(DISTINCT c),\n" +
+ "COUNT(DISTINCT c) FILTER (WHERE a > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3)\n" +
+ "FROM z GROUP BY d"
+ util.verifyPlan(sqlQuery)
+ }
+
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala
index a7f72c9..2957254 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala
@@ -35,6 +35,7 @@ class RemoveRedundantLocalSortAggRuleTest extends TableTestBase {
def setup(): Unit = {
util.addTableSource[(Int, Long, String)]("x", 'a, 'b, 'c)
util.addTableSource[(Int, Long, String)]("y", 'd, 'e, 'f)
+ util.addTableSource[(Int, Long, Long, Long, Long)]("z", 'a, 'b, 'c, 'd, 'e)
}
@Test
@@ -64,4 +65,14 @@ class RemoveRedundantLocalSortAggRuleTest extends TableTestBase {
util.verifyPlan(sqlQuery)
}
+ @Test
+ def testUsingLocalAggCallFilters(): Unit = {
+ util.tableEnv.getConfig.getConfiguration.setString(
+ ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+ val sqlQuery = "SELECT d, MAX(e), MAX(e) FILTER (WHERE a < 10), COUNT(DISTINCT c),\n" +
+ "COUNT(DISTINCT c) FILTER (WHERE a > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3)\n" +
+ "FROM z GROUP BY d"
+ util.verifyPlan(sqlQuery)
+ }
+
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java
index 80c297e..ba703a4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate;
import org.apache.flink.table.plan.logical.rel.LogicalWindowTableAggregate;
import org.apache.flink.table.plan.logical.rel.TableAggregate;
+import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.rel.RelNode;
@@ -63,7 +64,12 @@ public class ExtendedAggregateExtractProjectRule extends AggregateExtractProject
public static final ExtendedAggregateExtractProjectRule INSTANCE =
new ExtendedAggregateExtractProjectRule(
operand(SingleRel.class,
- operand(RelNode.class, any())), RelFactories.LOGICAL_BUILDER);
+ operand(RelNode.class, any())),
+ RelBuilder.proto(
+ Contexts.of(
+ RelFactories.DEFAULT_STRUCT,
+ RelBuilder.Config.DEFAULT
+ .withPruneInputOfAggregate(false))));
public ExtendedAggregateExtractProjectRule(
RelOptRuleOperand operand,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
index 75a57b3..838c99a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.calcite
import java.lang.Iterable
import java.util.{List => JList}
-
import org.apache.calcite.plan._
import org.apache.calcite.rel.logical.LogicalAggregate
import org.apache.calcite.tools.RelBuilder
@@ -33,6 +32,8 @@ import org.apache.flink.table.plan.logical.LogicalWindow
import org.apache.flink.table.plan.logical.rel.{LogicalTableAggregate, LogicalWindowAggregate, LogicalWindowTableAggregate}
import org.apache.flink.table.runtime.aggregate.AggregateUtil
+import java.util.function.UnaryOperator
+
import scala.collection.JavaConverters._
/**
@@ -86,7 +87,23 @@ class FlinkRelBuilder(
aggCalls: Iterable[AggCall])
: RelBuilder = {
// build logical aggregate
- val aggregate = super.aggregate(groupKey, aggCalls).build().asInstanceOf[LogicalAggregate]
+
+ // Because of:
+ // [CALCITE-3763] RelBuilder.aggregate should prune unused fields
+ // from the input, if the input is a Project.
+ //
+ // the field can not be pruned if it is referenced by other expressions
+ // of the window aggregation(i.e. the TUMBLE_START/END).
+ // To solve this, we config the RelBuilder to forbidden this feature.
+ val aggregate = transform(
+ new UnaryOperator[RelBuilder.Config] {
+ override def apply(t: RelBuilder.Config)
+ : RelBuilder.Config = t.withPruneInputOfAggregate(false)
+ })
+ .push(build())
+ .aggregate(groupKey, aggCalls)
+ .build()
+ .asInstanceOf[LogicalAggregate]
val namedProperties = windowProperties.asScala.map {
case Alias(p: WindowProperty, name, _) =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
index 431fe9e..a866b65 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
@@ -21,23 +21,33 @@ import com.google.common.collect.ImmutableList
import org.apache.calcite.plan._
import org.apache.calcite.plan.hep.HepRelVertex
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall, Project, RelFactories}
import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeUtil
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.api._
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.catalog.BasicOperatorTable
import org.apache.flink.table.plan.logical.LogicalWindow
import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.tools.RelBuilder
+
+import _root_.java.util.{ArrayList => JArrayList, Collections, List => JList}
import _root_.scala.collection.JavaConversions._
abstract class LogicalWindowAggregateRule(ruleName: String)
extends RelOptRule(
RelOptRule.operand(classOf[LogicalAggregate],
RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())),
+ RelBuilder.proto(
+ Contexts.of(
+ RelFactories.DEFAULT_STRUCT,
+ RelBuilder.Config.DEFAULT
+ .withBloat(-1))),
ruleName) {
override def matches(call: RelOptRuleCall): Boolean = {
@@ -50,7 +60,7 @@ abstract class LogicalWindowAggregateRule(ruleName: String)
throw new TableException("Only a single window group function may be used in GROUP BY")
}
- !groupSets && !agg.indicator && windowExpressions.nonEmpty
+ !groupSets && windowExpressions.nonEmpty
}
/**
@@ -61,8 +71,15 @@ abstract class LogicalWindowAggregateRule(ruleName: String)
* that the types are equivalent.
*/
override def onMatch(call: RelOptRuleCall): Unit = {
- val agg = call.rel[LogicalAggregate](0)
- val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+ val agg0 = call.rel[LogicalAggregate](0)
+ val project0 = call.rel[LogicalProject](1)
+ val project = rewriteWindowCallWithFuncOperands(project0, call.builder())
+ val agg = if (project != project0) {
+ agg0.copy(agg0.getTraitSet, Collections.singletonList(project))
+ .asInstanceOf[LogicalAggregate]
+ } else {
+ agg0
+ }
val (windowExpr, windowExprIdx) = getWindowExpressions(agg).head
val window = translateWindowExpression(windowExpr, project.getInput.getRowType)
@@ -90,7 +107,6 @@ abstract class LogicalWindowAggregateRule(ruleName: String)
// we don't use the builder here because it uses RelMetadataQuery which affects the plan
val newAgg = LogicalAggregate.create(
newProject,
- agg.indicator,
newGroupSet,
ImmutableList.of(newGroupSet),
finalCalls)
@@ -125,6 +141,104 @@ abstract class LogicalWindowAggregateRule(ruleName: String)
call.transformTo(result)
}
+ /** Trim out the HepRelVertex wrapper and get current relational expression. */
+ private def trimHep(node: RelNode): RelNode = {
+ node match {
+ case hepRelVertex: HepRelVertex =>
+ hepRelVertex.getCurrentRel
+ case _ => node
+ }
+ }
+
+ /**
+ * Rewrite plan with function call as window call operand: rewrite the window call to
+ * reference the input instead of invoking the function directly, in order to simplify the
+ * subsequent rewrite logic.
+ *
+ * For example, plan
+ * <pre>
+ * LogicalAggregate(group=[{0}], a=[COUNT()])
+ * LogicalProject($f0=[$TUMBLE(TUMBLE_ROWTIME($0), 4:INTERVAL SECOND)], a=[$1])
+ * LogicalProject($f0=[1970-01-01 00:00:00:TIMESTAMP(3)], a=[$0])
+ * </pre>
+ *
+ * would be rewritten to
+ * <pre>
+ * LogicalAggregate(group=[{0}], a=[COUNT()])
+ * LogicalProject($f0=[TUMBLE($1, 4:INTERVAL SECOND)], a=[$0])
+ * LogicalProject(a=[$1], zzzzz=[TUMBLE_ROWTIME($0)])
+ * LogicalProject($f0=[1970-01-01 00:00:00:TIMESTAMP(3)], a=[$0])
+ * </pre>
+ */
+ private def rewriteWindowCallWithFuncOperands(
+ project: LogicalProject,
+ relBuilder: RelBuilder): LogicalProject = {
+ val projectInput = trimHep(project.getInput)
+ if (!projectInput.isInstanceOf[Project]) {
+ return project
+ }
+ val inputProjects = projectInput.asInstanceOf[Project].getChildExps
+ var hasWindowCallWithFuncOperands: Boolean = false
+ var lastIdx = projectInput.getRowType.getFieldCount - 1;
+ val pushDownCalls = new JArrayList[RexNode]()
+ 0 until projectInput.getRowType.getFieldCount foreach {
+ idx => pushDownCalls.add(RexInputRef.of(idx, projectInput.getRowType))
+ }
+ val newProjectExprs = project.getChildExps.map {
+ case call: RexCall if isWindowCall(call) &&
+ isTimeAttributeCall(call.getOperands.head, inputProjects) =>
+ hasWindowCallWithFuncOperands = true
+ // Update the window call to reference a RexInputRef instead of a function call.
+ call.accept(
+ new RexShuttle {
+ override def visitCall(call: RexCall): RexNode = {
+ if (isTimeAttributeCall(call, inputProjects)) {
+ lastIdx += 1
+ pushDownCalls.add(call)
+ relBuilder.getRexBuilder.makeInputRef(
+ call.getType,
+ // We would project plus an additional function call
+ // at the end of input projection.
+ lastIdx)
+ } else {
+ super.visitCall(call)
+ }
+ }
+ })
+ case rex: RexNode => rex
+ }
+
+ if (hasWindowCallWithFuncOperands) {
+ relBuilder
+ .push(projectInput)
+ // project plus the function call.
+ .project(pushDownCalls)
+ .project(newProjectExprs, project.getRowType.getFieldNames)
+ .build()
+ .asInstanceOf[LogicalProject]
+ } else {
+ project
+ }
+ }
+
+ /** Decides if the [[RexNode]] is a call whose return type is
+ * a time indicator type. */
+ def isTimeAttributeCall(rexNode: RexNode, projects: JList[RexNode]): Boolean = rexNode match {
+ case call: RexCall if FlinkTypeFactory.isTimeIndicatorType(call.getType) =>
+ call.getOperands.forall { operand =>
+ operand.isInstanceOf[RexInputRef]
+ }
+ case _ => false
+ }
+
+ /** Decides whether the [[RexCall]] is a window call. */
+ def isWindowCall(call: RexCall): Boolean = call.getOperator match {
+ case BasicOperatorTable.SESSION |
+ BasicOperatorTable.HOP |
+ BasicOperatorTable.TUMBLE => true
+ case _ => false
+ }
+
/**
* Change the types of [[AggregateCall]] to the corresponding inferred types.
*/
@@ -182,7 +296,7 @@ abstract class LogicalWindowAggregateRule(ruleName: String)
private[table] def getWindowExpressions(agg: LogicalAggregate): Seq[(RexCall, Int)] = {
- val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+ val project = trimHep(agg.getInput).asInstanceOf[LogicalProject]
val groupKeys = agg.getGroupSet
// get grouping expressions
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala
index 50c0758..38bcaf5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala
@@ -19,8 +19,7 @@
package org.apache.flink.table.plan.rules.common
import java.util
-
-import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.plan.{Contexts, RelOptRule}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.{Aggregate, AggregateCall, RelFactories}
import org.apache.calcite.rel.logical.LogicalAggregate
@@ -35,9 +34,14 @@ import scala.collection.JavaConversions._
* Rule to convert complex aggregation functions into simpler ones.
* Have a look at [[AggregateReduceFunctionsRule]] for details.
*/
-class WindowAggregateReduceFunctionsRule extends AggregateReduceFunctionsRule(
- RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.any()),
- RelFactories.LOGICAL_BUILDER) {
+class WindowAggregateReduceFunctionsRule
+ extends AggregateReduceFunctionsRule(
+ RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.any()),
+ RelBuilder.proto(
+ Contexts.of(
+ RelFactories.DEFAULT_STRUCT,
+ RelBuilder.Config.DEFAULT
+ .withPruneInputOfAggregate(false)))) {
override def newAggregateRel(
relBuilder: RelBuilder,