You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/02/02 02:54:01 UTC
[flink] branch master updated: [FLINK-20345][table-planner-blink]
SplitAggregateRule: Adds an Expand node only if there are multiple distinct
aggregate functions
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a334b92 [FLINK-20345][table-planner-blink] SplitAggregateRule: Adds an Expand node only if there are multiple distinct aggregate functions
a334b92 is described below
commit a334b921dab093a20120a16589af991302e7cbb9
Author: Jing Zhang <be...@126.com>
AuthorDate: Tue Feb 2 10:53:37 2021 +0800
[FLINK-20345][table-planner-blink] SplitAggregateRule: Adds an Expand node only if there are multiple distinct aggregate functions
This closes #14602
---
.../plan/rules/logical/SplitAggregateRule.scala | 81 +++++++++++--------
.../plan/rules/logical/SplitAggregateRuleTest.xml | 72 ++++++++++-------
.../plan/stream/sql/agg/DistinctAggregateTest.xml | 90 +++++++++-------------
.../stream/sql/agg/IncrementalAggregateTest.xml | 50 +++++-------
.../rules/logical/SplitAggregateRuleTest.scala | 20 ++++-
.../runtime/stream/sql/SplitAggregateITCase.scala | 22 ++++++
6 files changed, 196 insertions(+), 139 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
index 6f8274e..e6214eb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
@@ -65,43 +65,52 @@ import scala.collection.JavaConversions._
* +-----+-----+-----+
*
* SQL:
- * SELECT SUM(b), COUNT(DISTINCT c), AVG(b) FROM MyTable GROUP BY a
+ * SELECT SUM(DISTINCT b), COUNT(DISTINCT c), AVG(b) FROM MyTable GROUP BY a
*
* flink logical plan:
* {{{
- * FlinkLogicalCalc(select=[a, $f1, $f2, CAST(IF(=($f4, 0:BIGINT), null:INTEGER, /($f3, $f4))) AS
- * $f3])
- * +- FlinkLogicalAggregate(group=[{0}], agg#0=[SUM($2)], agg#1=[$SUM0($3)], agg#2=[$SUM0($4)],
- * agg#3=[$SUM0($5)])
- * +- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[SUM($1) FILTER $4], agg#1=[COUNT(DISTINCT $2)
- * FILTER $5], agg#2=[$SUM0($1) FILTER $4], agg#3=[COUNT($1) FILTER $4])
- * +- FlinkLogicalCalc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
- * +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]},
- * {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
- * +- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(c), 1024) AS $f3])
+ * FlinkLogicalCalc(select=[$f1 AS EXPR$0, $f2 AS EXPR$1, CAST(IF(=($f4, 0:BIGINT), null:INTEGER,
+ * /($f3, $f4))) AS EXPR$2])
+ * +- FlinkLogicalAggregate(group=[{0}], agg#0=[SUM($3)], agg#1=[$SUM0($4)], agg#2=[$SUM0($5)],
+ * agg#3=[$SUM0($6)])
+ * +- FlinkLogicalAggregate(group=[{0, 3, 4}], agg#0=[SUM(DISTINCT $1) FILTER $5],
+ * agg#1=[COUNT(DISTINCT $2) FILTER $6], agg#2=[$SUM0($1) FILTER $7],
+ * agg#3=[COUNT($1) FILTER $7])
+ * +- FlinkLogicalCalc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 2) AS $g_2,
+ * =($e, 3) AS $g_3])
+ * +- FlinkLogicalExpand(projects=[a, b, c, $f3, $f4, $e])
+ * +- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3,
+ * MOD(HASH_CODE(c), 1024) AS $f4])
* +- FlinkLogicalTableSourceScan(table=[[MyTable,
* source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
* }}}
*
- * '$e = 0' is equivalent to 'group by a, hash(c) % 256'
- * '$e = 1' is equivalent to 'group by a'
+ * '$e = 1' is equivalent to 'group by a, hash(b) % 1024'
+ * '$e = 2' is equivalent to 'group by a, hash(c) % 1024'
+ * '$e = 3' is equivalent to 'group by a
*
* Expanded records:
- * +-----+-----+-----+------------------+-----+
- * | a | b | c | hash(c) % 256 | $e |
- * +-----+-----+-----+------------------+-----+ ---+---
- * | 1 | 1 | null| null | 1 | |
- * +-----+-----+-----+------------------+-----| records expanded by record1
- * | 1 | 1 | c1 | hash(c1) % 256 | 0 | |
- * +-----+-----+-----+------------------+-----+ ---+---
- * | 1 | 2 | null| null | 1 | |
- * +-----+-----+-----+------------------+-----+ records expanded by record2
- * | 1 | 2 | c1 | hash(c1) % 256 | 0 | |
- * +-----+-----+-----+------------------+-----+ ---+---
- * | 2 | 1 | null| null | 1 | |
- * +-----+-----+-----+------------------+-----+ records expanded by record3
- * | 2 | 1 | c2 | hash(c2) % 256 | 0 | |
- * +-----+-----+-----+------------------+-----+ ---+---
+ * +-----+-----+-----+------------------+------------------+-----+
+ * | a | b | c | hash(b) % 1024 | hash(c) % 1024 | $e |
+ * +-----+-----+-----+------------------+------------------+-----+ ---+---
+ * | 1 | 1 | c1 | hash(b) % 1024 | null | 1 | |
+ * +-----+-----+-----+------------------+------------------+-----+ |
+ * | 1 | 1 | c1 | null | hash(c) % 1024 | 2 | records expanded by record1
+ * +-----+-----+-----+------------------+-----------------+------+ |
+ * | 1 | 1 | c1 | null | null | 3 | |
+ * +-----+-----+-----+------------------+-----------------+------+ ---+---
+ * | 1 | 2 | c1 | hash(b) % 1024 | null | 1 | |
+ * +-----+-----+-----+------------------+-----------------+------+ |
+ * | 1 | 2 | c1 | null | hash(c) % 1024 | 2 | records expanded by record2
+ * +-----+-----+-----+------------------+-----------------+------+ |
+ * | 1 | 2 | c1 | null | null | 3 | |
+ * +-----+-----+-----+------------------+-----------------+------+ ---+---
+ * | 2 | 1 | c2 | hash(b) % 1024 | null | 1 | |
+ * +-----+-----+-----+------------------+-----------------+------+ |
+ * | 2 | 1 | c2 | null | hash(c) % 1024 | 2 | records expanded by record3
+ * +-----+-----+-----+------------------+-----------------+------+ |
+ * | 2 | 1 | c2 | null | null | 3 | |
+ * +-----+-----+-----+------------------+-----------------+------+ ---+---
*
* NOTES: this rule is only used for Stream now.
*/
@@ -166,12 +175,24 @@ class SplitAggregateRule extends RelOptRule(
// STEP 2: construct partial aggregates
val groupSetTreeSet = new util.TreeSet[ImmutableBitSet](ImmutableBitSet.ORDERING)
val aggInfoToGroupSetMap = new util.HashMap[AggregateCall, ImmutableBitSet]()
+ var newGroupSetsNum = 0
aggCalls.foreach { aggCall =>
val groupSet = if (SplitAggregateRule.needAddHashFields(aggCall)) {
val newIndexes = SplitAggregateRule.getArgIndexes(aggCall).map { argIndex =>
hashFieldsMap.getOrElse(argIndex, argIndex).asInstanceOf[Integer]
}.toSeq
- ImmutableBitSet.of(newIndexes).union(ImmutableBitSet.of(aggGroupSet: _*))
+ val newGroupSet = ImmutableBitSet.of(newIndexes).union(ImmutableBitSet.of(aggGroupSet: _*))
+ // Only increment groupSet number if aggregate call needs add new different hash fields
+ // e.g SQL1: SELECT COUNT(DISTINCT a), MAX(a) FROM T group by b
+ // newGroupSetsNum is 1 because two agg function add same hash field
+ // e.g SQL2: SELECT COUNT(DISTINCT a), COUNT(b) FROM T group by c
+ // newGroupSetsNum is 1 because only COUNT(DISTINCT a) adds a new hash field
+ // e.g SQL3: SELECT COUNT(DISTINCT a), COUNT(DISTINCT b) FROM T group by b
+ // newGroupSetsNum is 2 because COUNT(DISTINCT a), COUNT(DISTINCT b) both add hash field
+ if (!groupSetTreeSet.contains(newGroupSet)) {
+ newGroupSetsNum += 1
+ }
+ newGroupSet
} else {
ImmutableBitSet.of(aggGroupSet: _*)
}
@@ -195,7 +216,7 @@ class SplitAggregateRule extends RelOptRule(
}
}
- val needExpand = groupSets.size() > 1
+ val needExpand = newGroupSetsNum > 1
val duplicateFieldMap = if (needExpand) {
val (duplicateFieldMap, _) = ExpandUtil.buildExpandNode(
cluster, relBuilder, partialAggCalls, fullGroupSet, groupSets)
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
index bc61839..4ca2d22 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
@@ -38,11 +38,9 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1) FILTER $2], EXPR$2=[SUM
<Resource name="optimized rel plan">
<![CDATA[
FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)], agg#2=[SUM($4)])
-+- FlinkLogicalAggregate(group=[{0, 4}], agg#0=[COUNT(DISTINCT $1) FILTER $5], agg#1=[SUM($1) FILTER $6], agg#2=[SUM($1) FILTER $7])
- +- FlinkLogicalCalc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
- +- FlinkLogicalExpand(projects=[a, b, $f2, $f3, $f4, $e])
- +- FlinkLogicalCalc(select=[a, b, IS TRUE(<>(b, 2)) AS $f2, IS TRUE(<>(b, 5)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
- +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- FlinkLogicalAggregate(group=[{0, 4}], agg#0=[COUNT(DISTINCT $1) FILTER $2], agg#1=[SUM($1) FILTER $3], agg#2=[SUM($1) FILTER $2])
+ +- FlinkLogicalCalc(select=[a, b, IS TRUE(<>(b, 2)) AS $f2, IS TRUE(<>(b, 5)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+ +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -78,18 +76,14 @@ LogicalProject(c=[$0], b=[$1], d=[$2], a=[$3], a0=[$4], b0=[$5], c0=[$6])
<Resource name="optimized rel plan">
<![CDATA[
FlinkLogicalJoin(condition=[=($1, $4)], joinType=[inner])
-:- FlinkLogicalCalc(select=[$f2 AS c, CAST($f1) AS b, CAST($f1) AS d, $f2_0 AS a])
+:- FlinkLogicalCalc(select=[c, d AS b, d, a])
: +- FlinkLogicalAggregate(group=[{0}], agg#0=[SUM($2)], agg#1=[$SUM0($3)])
-: +- FlinkLogicalAggregate(group=[{2, 3}], agg#0=[SUM($1) FILTER $4], agg#1=[COUNT(DISTINCT $0) FILTER $5])
-: +- FlinkLogicalCalc(select=[a, $f1, $f2, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
-: +- FlinkLogicalExpand(projects=[a, $f1, $f2, $f3, $e])
-: +- FlinkLogicalCalc(select=[a, $f1, $f2, MOD(HASH_CODE(a), 1024) AS $f3])
-: +- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)])
-: +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1) FILTER $3], agg#1=[SUM($1) FILTER $4])
-: +- FlinkLogicalCalc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
-: +- FlinkLogicalExpand(projects=[a, b, $f2, $e])
-: +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-: +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+: +- FlinkLogicalAggregate(group=[{2, 3}], agg#0=[SUM($1)], agg#1=[COUNT(DISTINCT $0)])
+: +- FlinkLogicalCalc(select=[a, $f1, $f2, MOD(HASH_CODE(a), 1024) AS $f3])
+: +- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)])
+: +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1)], agg#1=[SUM($1)])
+: +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+: +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -169,6 +163,36 @@ FlinkLogicalAggregate(group=[{0}], agg#0=[MIN($3)], agg#1=[MAX($4)], agg#2=[SUM(
]]>
</Resource>
</TestCase>
+ <TestCase name="testMultipleDistinctAggOnSameColumn">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ COUNT(DISTINCT b),
+ COUNT(DISTINCT b) FILTER(WHERE b <> 5),
+ SUM(b),
+ AVG(b)
+FROM MyTable
+GROUP BY a
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[SUM($1)], EXPR$4=[AVG($1)])
++- LogicalProject(a=[$0], b=[$1], $f2=[IS TRUE(<>($1, 5))])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, $f1, $f2, $f3, CAST(IF(=($f5, 0:BIGINT), null:INTEGER, /($f4, $f5))) AS $f4])
++- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[$SUM0($3)], agg#2=[SUM($4)], agg#3=[$SUM0($5)], agg#4=[$SUM0($6)])
+ +- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[COUNT(DISTINCT $1)], agg#1=[COUNT(DISTINCT $1) FILTER $2], agg#2=[SUM($1)], agg#3=[$SUM0($1)], agg#4=[COUNT($1)])
+ +- FlinkLogicalCalc(select=[a, b, IS TRUE(<>(b, 5)) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
+ +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testSingleDistinctAgg">
<Resource name="sql">
<![CDATA[SELECT COUNT(DISTINCT c) FROM MyTable]]>
@@ -231,11 +255,9 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
<![CDATA[
FlinkLogicalCalc(select=[a, $f1, $f2, CAST(IF(=($f4, 0:BIGINT), null:INTEGER, /($f3, $f4))) AS $f3])
+- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)], agg#2=[$SUM0($4)], agg#3=[$SUM0($5)])
- +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1) FILTER $3], agg#1=[SUM($1) FILTER $4], agg#2=[$SUM0($1) FILTER $4], agg#3=[COUNT($1) FILTER $4])
- +- FlinkLogicalCalc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
- +- FlinkLogicalExpand(projects=[a, b, $f2, $e])
- +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
- +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1)], agg#1=[SUM($1)], agg#2=[$SUM0($1)], agg#3=[COUNT($1)])
+ +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+ +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -357,11 +379,9 @@ LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
<Resource name="optimized rel plan">
<![CDATA[
FlinkLogicalAggregate(group=[{0}], agg#0=[LISTAGG($2)], agg#1=[$SUM0($3)])
-+- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[LISTAGG($2) FILTER $4], agg#1=[COUNT(DISTINCT $1) FILTER $5])
- +- FlinkLogicalCalc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
- +- FlinkLogicalExpand(projects=[a, b, c, $f3, $e])
- +- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
- +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[LISTAGG($2)], agg#1=[COUNT(DISTINCT $1)])
+ +- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
+ +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
index 3ccc522..82f4622 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -98,13 +98,11 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1) FILTER $2], EXPR$2=[SUM
<![CDATA[
GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2) AS $f1, SUM_RETRACT($f3) AS $f2, SUM_RETRACT($f4_0) AS $f3])
+- Exchange(distribution=[hash[a]])
- +- GroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(DISTINCT b) FILTER $g_0 AS $f2, SUM(b) FILTER $g_1 AS $f3, SUM(b) FILTER $g_10 AS $f4_0])
+ +- GroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(DISTINCT b) FILTER $f2 AS $f2, SUM(b) FILTER $f3 AS $f3, SUM(b) FILTER $f2 AS $f4_0])
+- Exchange(distribution=[hash[a, $f4]])
- +- Calc(select=[a, b, $f2, $f3, $f4, (($e = 0) AND $f2) AS $g_0, (($e = 1) AND $f3) AS $g_1, (($e = 1) AND $f2) AS $g_10])
- +- Expand(projects=[a, b, $f2, $f3, $f4, $e])
- +- Calc(select=[a, b, (b <> 2:BIGINT) IS TRUE AS $f2, (b <> 5:BIGINT) IS TRUE AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Calc(select=[a, b, (b <> 2:BIGINT) IS TRUE AS $f2, (b <> 5:BIGINT) IS TRUE AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -134,12 +132,10 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
+- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2) AS sum$0, SUM_RETRACT($f3) AS (sum$1, count$2), SUM_RETRACT($f4_0) AS (sum$3, count$4), COUNT_RETRACT(*) AS count1$5])
+- GlobalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 count$0) AS $f2, SUM(sum$1) AS $f3, SUM(sum$2) AS $f4_0])
+- Exchange(distribution=[hash[a, $f4]])
- +- LocalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, SUM(b) FILTER $g_10 AS sum$2, DISTINCT(b) AS distinct$0])
- +- Calc(select=[a, b, $f2, $f3, $f4, (($e = 0) AND $f2) AS $g_0, (($e = 1) AND $f3) AS $g_1, (($e = 1) AND $f2) AS $g_10])
- +- Expand(projects=[a, b, $f2, $f3, $f4, $e])
- +- Calc(select=[a, b, (b <> 2:BIGINT) IS TRUE AS $f2, (b <> 5:BIGINT) IS TRUE AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- LocalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 b) FILTER $f2 AS count$0, SUM(b) FILTER $f3 AS sum$1, SUM(b) FILTER $f2 AS sum$2, DISTINCT(b) AS distinct$0])
+ +- Calc(select=[a, b, (b <> 2:BIGINT) IS TRUE AS $f2, (b <> 5:BIGINT) IS TRUE AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -961,13 +957,11 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
Calc(select=[a, $f1, $f2, IF(($f4 = 0:BIGINT), null:BIGINT, ($f3 / $f4)) AS $f3])
+- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, SUM_RETRACT($f3) AS $f2, $SUM0_RETRACT($f4) AS $f3, $SUM0_RETRACT($f5) AS $f4])
+- Exchange(distribution=[hash[a]])
- +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f2_0, SUM(b) FILTER $g_1 AS $f3, $SUM0(b) FILTER $g_1 AS $f4, COUNT(b) FILTER $g_1 AS $f5])
+ +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) AS $f2_0, SUM(b) AS $f3, $SUM0(b) AS $f4, COUNT(b) AS $f5])
+- Exchange(distribution=[hash[a, $f2]])
- +- Calc(select=[a, b, $f2, ($e = 0) AS $g_0, ($e = 1) AS $g_1])
- +- Expand(projects=[a, b, $f2, $e])
- +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -990,12 +984,10 @@ Calc(select=[a, $f1, $f2, IF(($f4 = 0:BIGINT), null:BIGINT, ($f3 / $f4)) AS $f3]
+- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS sum$0, SUM_RETRACT($f3) AS (sum$1, count$2), $SUM0_RETRACT($f4) AS sum$3, $SUM0_RETRACT($f5) AS sum$4, COUNT_RETRACT(*) AS count1$5])
+- GlobalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 count$0) AS $f2_0, SUM(sum$1) AS $f3, $SUM0(sum$2) AS $f4, COUNT(count$3) AS $f5])
+- Exchange(distribution=[hash[a, $f2]])
- +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, $SUM0(b) FILTER $g_1 AS sum$2, COUNT(b) FILTER $g_1 AS count$3, DISTINCT(b) AS distinct$0])
- +- Calc(select=[a, b, $f2, ($e = 0) AS $g_0, ($e = 1) AS $g_1])
- +- Expand(projects=[a, b, $f2, $e])
- +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, SUM(b) AS sum$1, $SUM0(b) AS sum$2, COUNT(b) AS count$3, DISTINCT(b) AS distinct$0])
+ +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1157,15 +1149,13 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT()])
<![CDATA[
GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, $SUM0_RETRACT($f3) AS $f2], changelogMode=[I,UA,D])
+- Exchange(distribution=[hash[a]], changelogMode=[I,UB,UA,D])
- +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(DISTINCT b) FILTER $g_0 AS $f2_0, COUNT_RETRACT(*) FILTER $g_1 AS $f3], changelogMode=[I,UB,UA,D])
+ +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(DISTINCT b) AS $f2_0, COUNT_RETRACT(*) AS $f3], changelogMode=[I,UB,UA,D])
+- Exchange(distribution=[hash[a, $f2]], changelogMode=[I,UB,UA])
- +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1], changelogMode=[I,UB,UA])
- +- Expand(projects=[a, b, $f2, $e], changelogMode=[I,UB,UA])
- +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], changelogMode=[I,UB,UA])
- +- GroupAggregate(groupBy=[c], select=[c, AVG(a) AS a, AVG(b) AS b], changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[c]], changelogMode=[I])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], changelogMode=[I])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+ +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], changelogMode=[I,UB,UA])
+ +- GroupAggregate(groupBy=[c], select=[c, AVG(a) AS a, AVG(b) AS b], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[c]], changelogMode=[I])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], changelogMode=[I])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
]]>
</Resource>
</TestCase>
@@ -1196,15 +1186,13 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
+- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS sum$0, $SUM0_RETRACT($f3) AS sum$1, COUNT_RETRACT(*) AS count1$2], changelogMode=[I])
+- GlobalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(distinct$0 count$0) AS $f2_0, COUNT_RETRACT(count1$1) AS $f3], changelogMode=[I,UB,UA,D])
+- Exchange(distribution=[hash[a, $f2]], changelogMode=[I])
- +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(distinct$0 b) FILTER $g_0 AS count$0, COUNT_RETRACT(*) FILTER $g_1 AS count1$1, COUNT_RETRACT(*) AS count1$2, DISTINCT(b) AS distinct$0], changelogMode=[I])
- +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1], changelogMode=[I,UB,UA])
- +- Expand(projects=[a, b, $f2, $e], changelogMode=[I,UB,UA])
- +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], changelogMode=[I,UB,UA])
- +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[c]], changelogMode=[I])
- +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], changelogMode=[I])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], changelogMode=[I])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+ +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(distinct$0 b) AS count$0, COUNT_RETRACT(*) AS count1$1, DISTINCT(b) AS distinct$0], changelogMode=[I])
+ +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], changelogMode=[I,UB,UA])
+ +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[c]], changelogMode=[I])
+ +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], changelogMode=[I])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], changelogMode=[I])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
]]>
</Resource>
</TestCase>
@@ -1444,13 +1432,11 @@ LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
<![CDATA[
GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT($f2) AS $f1, $SUM0_RETRACT($f3_0) AS $f2])
+- Exchange(distribution=[hash[a]])
- +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f3_0])
+ +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) AS $f2, COUNT(DISTINCT b) AS $f3_0])
+- Exchange(distribution=[hash[a, $f3]])
- +- Calc(select=[a, b, c, $f3, ($e = 1) AS $g_1, ($e = 0) AS $g_0])
- +- Expand(projects=[a, b, c, $f3, $e])
- +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1472,12 +1458,10 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_R
+- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT($f2) AS listagg$0, $SUM0_RETRACT($f3_0) AS sum$1, COUNT_RETRACT(*) AS count1$2])
+- GlobalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG((accDelimiter$0, concatAcc$1)) AS $f2, COUNT(distinct$0 count$2) AS $f3_0])
+- Exchange(distribution=[hash[a, $f3]])
- +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
- +- Calc(select=[a, b, c, $f3, ($e = 1) AS $g_1, ($e = 0) AS $g_0])
- +- Expand(projects=[a, b, c, $f3, $e])
- +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0])
+ +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
index f8167c1..b46d2d6 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -41,12 +41,10 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
+- Exchange(distribution=[hash[a]])
+- IncrementalGroupAggregate(partialAggGrouping=[a, $f4], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, SUM(sum$1) AS sum$1, SUM(sum$2) AS sum$2])
+- Exchange(distribution=[hash[a, $f4]])
- +- LocalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, SUM(b) FILTER $g_10 AS sum$2, DISTINCT(b) AS distinct$0])
- +- Calc(select=[a, b, $f2, $f3, $f4, (($e = 0) AND $f2) AS $g_0, (($e = 1) AND $f3) AS $g_1, (($e = 1) AND $f2) AS $g_10])
- +- Expand(projects=[a, b, $f2, $f3, $f4, $e])
- +- Calc(select=[a, b, (b <> 2:BIGINT) IS TRUE AS $f2, (b <> 5:BIGINT) IS TRUE AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- LocalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 b) FILTER $f2 AS count$0, SUM(b) FILTER $f3 AS sum$1, SUM(b) FILTER $f2 AS sum$2, DISTINCT(b) AS distinct$0])
+ +- Calc(select=[a, b, (b <> 2:BIGINT) IS TRUE AS $f2, (b <> 5:BIGINT) IS TRUE AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -291,12 +289,10 @@ Calc(select=[a, $f1, $f2, IF(($f4 = 0:BIGINT), null:BIGINT, ($f3 / $f4)) AS $f3]
+- Exchange(distribution=[hash[a]])
+- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, SUM(sum$1) AS sum$1, $SUM0(sum$2) AS sum$2, COUNT(count$3) AS count$3])
+- Exchange(distribution=[hash[a, $f2]])
- +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, $SUM0(b) FILTER $g_1 AS sum$2, COUNT(b) FILTER $g_1 AS count$3, DISTINCT(b) AS distinct$0])
- +- Calc(select=[a, b, $f2, ($e = 0) AS $g_0, ($e = 1) AS $g_1])
- +- Expand(projects=[a, b, $f2, $e])
- +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, SUM(b) AS sum$1, $SUM0(b) AS sum$2, COUNT(b) AS count$3, DISTINCT(b) AS distinct$0])
+ +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -322,19 +318,17 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT()])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT(count$0) AS $f1, $SUM0_RETRACT(count1$1) AS $f2], changelogMode=[I,UA,D])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1, $SUM0(count1$1) AS $f2], changelogMode=[I,UA,D])
+- Exchange(distribution=[hash[a]], changelogMode=[I])
- +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT_RETRACT(distinct$0 count$0) AS count$0, COUNT_RETRACT(count1$1) AS count1$1, COUNT_RETRACT(count1$2) AS count1$2], changelogMode=[I])
+ +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT_RETRACT(distinct$0 count$0) AS count$0, COUNT_RETRACT(count1$1) AS count1$1], changelogMode=[I])
+- Exchange(distribution=[hash[a, $f2]], changelogMode=[I])
- +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(distinct$0 b) FILTER $g_0 AS count$0, COUNT_RETRACT(*) FILTER $g_1 AS count1$1, COUNT_RETRACT(*) AS count1$2, DISTINCT(b) AS distinct$0], changelogMode=[I])
- +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1], changelogMode=[I,UB,UA])
- +- Expand(projects=[a, b, $f2, $e], changelogMode=[I,UB,UA])
- +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], changelogMode=[I,UB,UA])
- +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[c]], changelogMode=[I])
- +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], changelogMode=[I])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], changelogMode=[I])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+ +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(distinct$0 b) AS count$0, COUNT_RETRACT(*) AS count1$1, DISTINCT(b) AS distinct$0], changelogMode=[I])
+ +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], changelogMode=[I,UB,UA])
+ +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[c]], changelogMode=[I])
+ +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], changelogMode=[I])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], changelogMode=[I])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
]]>
</Resource>
</TestCase>
@@ -405,12 +399,10 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG((
+- Exchange(distribution=[hash[a]])
+- IncrementalGroupAggregate(partialAggGrouping=[a, $f3], finalAggGrouping=[a], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 count$2) AS count$2])
+- Exchange(distribution=[hash[a, $f3]])
- +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
- +- Calc(select=[a, b, c, $f3, ($e = 1) AS $g_1, ($e = 0) AS $g_0])
- +- Expand(projects=[a, b, c, $f3, $e])
- +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0])
+ +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
index 8f433c3..4dbce13 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.planner.utils.TableTestBase
import org.junit.Test
-/**
+/**IncrementalAggregateTest
* Test for [[SplitAggregateRule]].
*/
class SplitAggregateRuleTest extends TableTestBase {
@@ -168,4 +168,22 @@ class SplitAggregateRuleTest extends TableTestBase {
val sqlQuery = "SELECT COUNT(DISTINCT c) FROM MyTable"
util.verifyRelPlan(sqlQuery)
}
+
+ @Test
+ def testMultipleDistinctAggOnSameColumn(): Unit = {
+ util.tableEnv.getConfig.getConfiguration.setBoolean(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+ val sqlQuery =
+ s"""
+ |SELECT
+ | a,
+ | COUNT(DISTINCT b),
+ | COUNT(DISTINCT b) FILTER(WHERE b <> 5),
+ | SUM(b),
+ | AVG(b)
+ |FROM MyTable
+ |GROUP BY a
+ |""".stripMargin
+ util.verifyRelPlan(sqlQuery)
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index c1c34c7..d799318 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -390,6 +390,28 @@ class SplitAggregateITCase(
val expected = List("1,1,50", "1,ALL,50")
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
+
+ @Test
+ def testMultipleDistinctAggOnSameColumn(): Unit = {
+ val t1 = tEnv.sqlQuery(
+ s"""
+ |SELECT
+ | a,
+ | COUNT(DISTINCT b),
+ | COUNT(DISTINCT b) filter (where not b = 2),
+ | MAX(b) filter (where not b = 5),
+ | MIN(b) filter (where not b = 2)
+ |FROM T
+ |GROUP BY a
+ """.stripMargin)
+
+ val sink = new TestingRetractSink
+ t1.toRetractStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List("1,2,1,2,1", "2,4,3,4,3", "3,1,1,null,5", "4,2,2,6,5")
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
}
object SplitAggregateITCase {