You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/02/02 02:54:01 UTC

[flink] branch master updated: [FLINK-20345][table-planner-blink] SplitAggregateRule: Adds an Expand node only if there are multiple distinct aggregate functions

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a334b92  [FLINK-20345][table-planner-blink] SplitAggregateRule: Adds an Expand node only if there are multiple distinct aggregate functions
a334b92 is described below

commit a334b921dab093a20120a16589af991302e7cbb9
Author: Jing Zhang <be...@126.com>
AuthorDate: Tue Feb 2 10:53:37 2021 +0800

    [FLINK-20345][table-planner-blink] SplitAggregateRule: Adds an Expand node only if there are multiple distinct aggregate functions
    
    This closes #14602
---
 .../plan/rules/logical/SplitAggregateRule.scala    | 81 +++++++++++--------
 .../plan/rules/logical/SplitAggregateRuleTest.xml  | 72 ++++++++++-------
 .../plan/stream/sql/agg/DistinctAggregateTest.xml  | 90 +++++++++-------------
 .../stream/sql/agg/IncrementalAggregateTest.xml    | 50 +++++-------
 .../rules/logical/SplitAggregateRuleTest.scala     | 20 ++++-
 .../runtime/stream/sql/SplitAggregateITCase.scala  | 22 ++++++
 6 files changed, 196 insertions(+), 139 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
index 6f8274e..e6214eb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
@@ -65,43 +65,52 @@ import scala.collection.JavaConversions._
   * +-----+-----+-----+
   *
   * SQL:
-  * SELECT SUM(b), COUNT(DISTINCT c), AVG(b) FROM MyTable GROUP BY a
+  * SELECT SUM(DISTINCT b), COUNT(DISTINCT c), AVG(b) FROM MyTable GROUP BY a
   *
   * flink logical plan:
   * {{{
-  * FlinkLogicalCalc(select=[a, $f1, $f2, CAST(IF(=($f4, 0:BIGINT), null:INTEGER, /($f3, $f4))) AS
-  *     $f3])
-  * +- FlinkLogicalAggregate(group=[{0}], agg#0=[SUM($2)], agg#1=[$SUM0($3)], agg#2=[$SUM0($4)],
-  *        agg#3=[$SUM0($5)])
-  *    +- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[SUM($1) FILTER $4], agg#1=[COUNT(DISTINCT $2)
-  *           FILTER $5], agg#2=[$SUM0($1) FILTER $4], agg#3=[COUNT($1) FILTER $4])
-  *       +- FlinkLogicalCalc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
-  *          +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]},
-  *                 {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
-  *             +- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(c), 1024) AS $f3])
+  * FlinkLogicalCalc(select=[$f1 AS EXPR$0, $f2 AS EXPR$1, CAST(IF(=($f4, 0:BIGINT), null:INTEGER,
+  *  /($f3, $f4))) AS EXPR$2])
+  * +- FlinkLogicalAggregate(group=[{0}], agg#0=[SUM($3)], agg#1=[$SUM0($4)], agg#2=[$SUM0($5)],
+  * agg#3=[$SUM0($6)])
+  *    +- FlinkLogicalAggregate(group=[{0, 3, 4}], agg#0=[SUM(DISTINCT $1) FILTER $5],
+  *    agg#1=[COUNT(DISTINCT $2) FILTER $6], agg#2=[$SUM0($1) FILTER $7],
+  *    agg#3=[COUNT($1) FILTER $7])
+  *       +- FlinkLogicalCalc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 2) AS $g_2,
+  *       =($e, 3) AS $g_3])
+  *          +- FlinkLogicalExpand(projects=[a, b, c, $f3, $f4, $e])
+  *             +- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3,
+  *               MOD(HASH_CODE(c), 1024) AS $f4])
   *                +- FlinkLogicalTableSourceScan(table=[[MyTable,
   *                       source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
   * }}}
   *
-  * '$e = 0' is equivalent to 'group by a, hash(c) % 256'
-  * '$e = 1' is equivalent to 'group by a'
+  * '$e = 1' is equivalent to 'group by a, hash(b) % 1024'
+  * '$e = 2' is equivalent to 'group by a, hash(c) % 1024'
+  * '$e = 3' is equivalent to 'group by a
   *
   * Expanded records:
-  * +-----+-----+-----+------------------+-----+
-  * |  a  |  b  |  c  |  hash(c) % 256   | $e  |
-  * +-----+-----+-----+------------------+-----+        ---+---
-  * |  1  |  1  | null|       null       |  1  |           |
-  * +-----+-----+-----+------------------+-----|  records expanded by record1
-  * |  1  |  1  |  c1 |  hash(c1) % 256  |  0  |           |
-  * +-----+-----+-----+------------------+-----+        ---+---
-  * |  1  |  2  | null|       null       |  1  |           |
-  * +-----+-----+-----+------------------+-----+  records expanded by record2
-  * |  1  |  2  |  c1 |  hash(c1) % 256  |  0  |           |
-  * +-----+-----+-----+------------------+-----+        ---+---
-  * |  2  |  1  | null|       null       |  1  |           |
-  * +-----+-----+-----+------------------+-----+  records expanded by record3
-  * |  2  |  1  |  c2 |  hash(c2) % 256  |  0  |           |
-  * +-----+-----+-----+------------------+-----+        ---+---
+  * +-----+-----+-----+------------------+------------------+-----+
+  * |  a  |  b  |  c  |  hash(b) % 1024  |  hash(c) % 1024  | $e  |
+  * +-----+-----+-----+------------------+------------------+-----+               ---+---
+  * |  1  |  1  |  c1 | hash(b) % 1024   |  null            | 1   |                  |
+  * +-----+-----+-----+------------------+------------------+-----+                  |
+  * |  1  |  1  |  c1 | null             |  hash(c) % 1024 |  2   |    records expanded by record1
+  * +-----+-----+-----+------------------+-----------------+------+                  |
+  * |  1  |  1  |  c1 | null             |  null           |  3   |                  |
+  * +-----+-----+-----+------------------+-----------------+------+               ---+---
+  * |  1  |  2  |  c1 | hash(b) % 1024   |  null           |  1   |                  |
+  * +-----+-----+-----+------------------+-----------------+------+                  |
+  * |  1  |  2  |  c1 | null             |  hash(c) % 1024 |  2   |    records expanded by record2
+  * +-----+-----+-----+------------------+-----------------+------+                  |
+  * |  1  |  2  |  c1 | null             |  null           |  3   |                  |
+  * +-----+-----+-----+------------------+-----------------+------+               ---+---
+  * |  2  |  1  |  c2 | hash(b) % 1024   |  null           |  1   |                  |
+  * +-----+-----+-----+------------------+-----------------+------+                  |
+  * |  2  |  1  |  c2 | null             |  hash(c) % 1024 |  2   |    records expanded by record3
+  * +-----+-----+-----+------------------+-----------------+------+                  |
+  * |  2  |  1  |  c2 |  null            |  null           |  3   |                  |
+  * +-----+-----+-----+------------------+-----------------+------+               ---+---
   *
   * NOTES: this rule is only used for Stream now.
   */
@@ -166,12 +175,24 @@ class SplitAggregateRule extends RelOptRule(
     // STEP 2: construct partial aggregates
     val groupSetTreeSet = new util.TreeSet[ImmutableBitSet](ImmutableBitSet.ORDERING)
     val aggInfoToGroupSetMap = new util.HashMap[AggregateCall, ImmutableBitSet]()
+    var newGroupSetsNum = 0
     aggCalls.foreach { aggCall =>
       val groupSet = if (SplitAggregateRule.needAddHashFields(aggCall)) {
         val newIndexes = SplitAggregateRule.getArgIndexes(aggCall).map { argIndex =>
           hashFieldsMap.getOrElse(argIndex, argIndex).asInstanceOf[Integer]
         }.toSeq
-        ImmutableBitSet.of(newIndexes).union(ImmutableBitSet.of(aggGroupSet: _*))
+        val newGroupSet = ImmutableBitSet.of(newIndexes).union(ImmutableBitSet.of(aggGroupSet: _*))
+        // Only increment groupSet number if aggregate call needs add new different hash fields
+        // e.g SQL1: SELECT COUNT(DISTINCT a), MAX(a) FROM T group by b
+        // newGroupSetsNum is 1 because two agg function add same hash field
+        // e.g SQL2: SELECT COUNT(DISTINCT a), COUNT(b) FROM T group by c
+        // newGroupSetsNum is 1 because only COUNT(DISTINCT a) adds a new hash field
+        // e.g SQL3: SELECT COUNT(DISTINCT a), COUNT(DISTINCT b) FROM T group by b
+        // newGroupSetsNum is 2 because COUNT(DISTINCT a), COUNT(DISTINCT b) both add hash field
+        if (!groupSetTreeSet.contains(newGroupSet)) {
+          newGroupSetsNum += 1
+        }
+        newGroupSet
       } else {
         ImmutableBitSet.of(aggGroupSet: _*)
       }
@@ -195,7 +216,7 @@ class SplitAggregateRule extends RelOptRule(
       }
     }
 
-    val needExpand = groupSets.size() > 1
+    val needExpand = newGroupSetsNum > 1
     val duplicateFieldMap = if (needExpand) {
       val (duplicateFieldMap, _) = ExpandUtil.buildExpandNode(
         cluster, relBuilder, partialAggCalls, fullGroupSet, groupSets)
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
index bc61839..4ca2d22 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
@@ -38,11 +38,9 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1) FILTER $2], EXPR$2=[SUM
     <Resource name="optimized rel plan">
       <![CDATA[
 FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)], agg#2=[SUM($4)])
-+- FlinkLogicalAggregate(group=[{0, 4}], agg#0=[COUNT(DISTINCT $1) FILTER $5], agg#1=[SUM($1) FILTER $6], agg#2=[SUM($1) FILTER $7])
-   +- FlinkLogicalCalc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
-      +- FlinkLogicalExpand(projects=[a, b, $f2, $f3, $f4, $e])
-         +- FlinkLogicalCalc(select=[a, b, IS TRUE(<>(b, 2)) AS $f2, IS TRUE(<>(b, 5)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
-            +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- FlinkLogicalAggregate(group=[{0, 4}], agg#0=[COUNT(DISTINCT $1) FILTER $2], agg#1=[SUM($1) FILTER $3], agg#2=[SUM($1) FILTER $2])
+   +- FlinkLogicalCalc(select=[a, b, IS TRUE(<>(b, 2)) AS $f2, IS TRUE(<>(b, 5)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+      +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -78,18 +76,14 @@ LogicalProject(c=[$0], b=[$1], d=[$2], a=[$3], a0=[$4], b0=[$5], c0=[$6])
     <Resource name="optimized rel plan">
       <![CDATA[
 FlinkLogicalJoin(condition=[=($1, $4)], joinType=[inner])
-:- FlinkLogicalCalc(select=[$f2 AS c, CAST($f1) AS b, CAST($f1) AS d, $f2_0 AS a])
+:- FlinkLogicalCalc(select=[c, d AS b, d, a])
 :  +- FlinkLogicalAggregate(group=[{0}], agg#0=[SUM($2)], agg#1=[$SUM0($3)])
-:     +- FlinkLogicalAggregate(group=[{2, 3}], agg#0=[SUM($1) FILTER $4], agg#1=[COUNT(DISTINCT $0) FILTER $5])
-:        +- FlinkLogicalCalc(select=[a, $f1, $f2, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
-:           +- FlinkLogicalExpand(projects=[a, $f1, $f2, $f3, $e])
-:              +- FlinkLogicalCalc(select=[a, $f1, $f2, MOD(HASH_CODE(a), 1024) AS $f3])
-:                 +- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)])
-:                    +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1) FILTER $3], agg#1=[SUM($1) FILTER $4])
-:                       +- FlinkLogicalCalc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
-:                          +- FlinkLogicalExpand(projects=[a, b, $f2, $e])
-:                             +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-:                                +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+:     +- FlinkLogicalAggregate(group=[{2, 3}], agg#0=[SUM($1)], agg#1=[COUNT(DISTINCT $0)])
+:        +- FlinkLogicalCalc(select=[a, $f1, $f2, MOD(HASH_CODE(a), 1024) AS $f3])
+:           +- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)])
+:              +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1)], agg#1=[SUM($1)])
+:                 +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+:                    +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -169,6 +163,36 @@ FlinkLogicalAggregate(group=[{0}], agg#0=[MIN($3)], agg#1=[MAX($4)], agg#2=[SUM(
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testMultipleDistinctAggOnSameColumn">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  a,
+  COUNT(DISTINCT b),
+  COUNT(DISTINCT b) FILTER(WHERE b <> 5),
+  SUM(b),
+  AVG(b)
+FROM MyTable
+GROUP BY a
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[SUM($1)], EXPR$4=[AVG($1)])
++- LogicalProject(a=[$0], b=[$1], $f2=[IS TRUE(<>($1, 5))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, $f1, $f2, $f3, CAST(IF(=($f5, 0:BIGINT), null:INTEGER, /($f4, $f5))) AS $f4])
++- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[$SUM0($3)], agg#2=[SUM($4)], agg#3=[$SUM0($5)], agg#4=[$SUM0($6)])
+   +- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[COUNT(DISTINCT $1)], agg#1=[COUNT(DISTINCT $1) FILTER $2], agg#2=[SUM($1)], agg#3=[$SUM0($1)], agg#4=[COUNT($1)])
+      +- FlinkLogicalCalc(select=[a, b, IS TRUE(<>(b, 5)) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
+         +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testSingleDistinctAgg">
     <Resource name="sql">
       <![CDATA[SELECT COUNT(DISTINCT c) FROM MyTable]]>
@@ -231,11 +255,9 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
       <![CDATA[
 FlinkLogicalCalc(select=[a, $f1, $f2, CAST(IF(=($f4, 0:BIGINT), null:INTEGER, /($f3, $f4))) AS $f3])
 +- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)], agg#2=[$SUM0($4)], agg#3=[$SUM0($5)])
-   +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1) FILTER $3], agg#1=[SUM($1) FILTER $4], agg#2=[$SUM0($1) FILTER $4], agg#3=[COUNT($1) FILTER $4])
-      +- FlinkLogicalCalc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
-         +- FlinkLogicalExpand(projects=[a, b, $f2, $e])
-            +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-               +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1)], agg#1=[SUM($1)], agg#2=[$SUM0($1)], agg#3=[COUNT($1)])
+      +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+         +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -357,11 +379,9 @@ LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
     <Resource name="optimized rel plan">
       <![CDATA[
 FlinkLogicalAggregate(group=[{0}], agg#0=[LISTAGG($2)], agg#1=[$SUM0($3)])
-+- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[LISTAGG($2) FILTER $4], agg#1=[COUNT(DISTINCT $1) FILTER $5])
-   +- FlinkLogicalCalc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
-      +- FlinkLogicalExpand(projects=[a, b, c, $f3, $e])
-         +- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
-            +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[LISTAGG($2)], agg#1=[COUNT(DISTINCT $1)])
+   +- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
+      +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
index 3ccc522..82f4622 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -98,13 +98,11 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1) FILTER $2], EXPR$2=[SUM
       <![CDATA[
 GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2) AS $f1, SUM_RETRACT($f3) AS $f2, SUM_RETRACT($f4_0) AS $f3])
 +- Exchange(distribution=[hash[a]])
-   +- GroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(DISTINCT b) FILTER $g_0 AS $f2, SUM(b) FILTER $g_1 AS $f3, SUM(b) FILTER $g_10 AS $f4_0])
+   +- GroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(DISTINCT b) FILTER $f2 AS $f2, SUM(b) FILTER $f3 AS $f3, SUM(b) FILTER $f2 AS $f4_0])
       +- Exchange(distribution=[hash[a, $f4]])
-         +- Calc(select=[a, b, $f2, $f3, $f4, (($e = 0) AND $f2) AS $g_0, (($e = 1) AND $f3) AS $g_1, (($e = 1) AND $f2) AS $g_10])
-            +- Expand(projects=[a, b, $f2, $f3, $f4, $e])
-               +- Calc(select=[a, b, (b <> 2:BIGINT) IS TRUE AS $f2, (b <> 5:BIGINT) IS TRUE AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
-                  +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-                     +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- Calc(select=[a, b, (b <> 2:BIGINT) IS TRUE AS $f2, (b <> 5:BIGINT) IS TRUE AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+            +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+               +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -134,12 +132,10 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
    +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2) AS sum$0, SUM_RETRACT($f3) AS (sum$1, count$2), SUM_RETRACT($f4_0) AS (sum$3, count$4), COUNT_RETRACT(*) AS count1$5])
       +- GlobalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 count$0) AS $f2, SUM(sum$1) AS $f3, SUM(sum$2) AS $f4_0])
          +- Exchange(distribution=[hash[a, $f4]])
-            +- LocalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, SUM(b) FILTER $g_10 AS sum$2, DISTINCT(b) AS distinct$0])
-               +- Calc(select=[a, b, $f2, $f3, $f4, (($e = 0) AND $f2) AS $g_0, (($e = 1) AND $f3) AS $g_1, (($e = 1) AND $f2) AS $g_10])
-                  +- Expand(projects=[a, b, $f2, $f3, $f4, $e])
-                     +- Calc(select=[a, b, (b <> 2:BIGINT) IS TRUE AS $f2, (b <> 5:BIGINT) IS TRUE AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
-                        +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-                           +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- LocalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 b) FILTER $f2 AS count$0, SUM(b) FILTER $f3 AS sum$1, SUM(b) FILTER $f2 AS sum$2, DISTINCT(b) AS distinct$0])
+               +- Calc(select=[a, b, (b <> 2:BIGINT) IS TRUE AS $f2, (b <> 5:BIGINT) IS TRUE AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+                  +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+                     +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -961,13 +957,11 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
 Calc(select=[a, $f1, $f2, IF(($f4 = 0:BIGINT), null:BIGINT, ($f3 / $f4)) AS $f3])
 +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, SUM_RETRACT($f3) AS $f2, $SUM0_RETRACT($f4) AS $f3, $SUM0_RETRACT($f5) AS $f4])
    +- Exchange(distribution=[hash[a]])
-      +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f2_0, SUM(b) FILTER $g_1 AS $f3, $SUM0(b) FILTER $g_1 AS $f4, COUNT(b) FILTER $g_1 AS $f5])
+      +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) AS $f2_0, SUM(b) AS $f3, $SUM0(b) AS $f4, COUNT(b) AS $f5])
          +- Exchange(distribution=[hash[a, $f2]])
-            +- Calc(select=[a, b, $f2, ($e = 0) AS $g_0, ($e = 1) AS $g_1])
-               +- Expand(projects=[a, b, $f2, $e])
-                  +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-                     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-                        +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+               +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+                  +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -990,12 +984,10 @@ Calc(select=[a, $f1, $f2, IF(($f4 = 0:BIGINT), null:BIGINT, ($f3 / $f4)) AS $f3]
       +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS sum$0, SUM_RETRACT($f3) AS (sum$1, count$2), $SUM0_RETRACT($f4) AS sum$3, $SUM0_RETRACT($f5) AS sum$4, COUNT_RETRACT(*) AS count1$5])
          +- GlobalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 count$0) AS $f2_0, SUM(sum$1) AS $f3, $SUM0(sum$2) AS $f4, COUNT(count$3) AS $f5])
             +- Exchange(distribution=[hash[a, $f2]])
-               +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, $SUM0(b) FILTER $g_1 AS sum$2, COUNT(b) FILTER $g_1 AS count$3, DISTINCT(b) AS distinct$0])
-                  +- Calc(select=[a, b, $f2, ($e = 0) AS $g_0, ($e = 1) AS $g_1])
-                     +- Expand(projects=[a, b, $f2, $e])
-                        +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-                           +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-                              +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+               +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, SUM(b) AS sum$1, $SUM0(b) AS sum$2, COUNT(b) AS count$3, DISTINCT(b) AS distinct$0])
+                  +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+                     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+                        +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1157,15 +1149,13 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT()])
       <![CDATA[
 GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, $SUM0_RETRACT($f3) AS $f2], changelogMode=[I,UA,D])
 +- Exchange(distribution=[hash[a]], changelogMode=[I,UB,UA,D])
-   +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(DISTINCT b) FILTER $g_0 AS $f2_0, COUNT_RETRACT(*) FILTER $g_1 AS $f3], changelogMode=[I,UB,UA,D])
+   +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(DISTINCT b) AS $f2_0, COUNT_RETRACT(*) AS $f3], changelogMode=[I,UB,UA,D])
       +- Exchange(distribution=[hash[a, $f2]], changelogMode=[I,UB,UA])
-         +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1], changelogMode=[I,UB,UA])
-            +- Expand(projects=[a, b, $f2, $e], changelogMode=[I,UB,UA])
-               +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], changelogMode=[I,UB,UA])
-                  +- GroupAggregate(groupBy=[c], select=[c, AVG(a) AS a, AVG(b) AS b], changelogMode=[I,UB,UA])
-                     +- Exchange(distribution=[hash[c]], changelogMode=[I])
-                        +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], changelogMode=[I])
-                           +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+         +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], changelogMode=[I,UB,UA])
+            +- GroupAggregate(groupBy=[c], select=[c, AVG(a) AS a, AVG(b) AS b], changelogMode=[I,UB,UA])
+               +- Exchange(distribution=[hash[c]], changelogMode=[I])
+                  +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], changelogMode=[I])
+                     +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
 ]]>
     </Resource>
   </TestCase>
@@ -1196,15 +1186,13 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
    +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS sum$0, $SUM0_RETRACT($f3) AS sum$1, COUNT_RETRACT(*) AS count1$2], changelogMode=[I])
       +- GlobalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(distinct$0 count$0) AS $f2_0, COUNT_RETRACT(count1$1) AS $f3], changelogMode=[I,UB,UA,D])
          +- Exchange(distribution=[hash[a, $f2]], changelogMode=[I])
-            +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(distinct$0 b) FILTER $g_0 AS count$0, COUNT_RETRACT(*) FILTER $g_1 AS count1$1, COUNT_RETRACT(*) AS count1$2, DISTINCT(b) AS distinct$0], changelogMode=[I])
-               +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1], changelogMode=[I,UB,UA])
-                  +- Expand(projects=[a, b, $f2, $e], changelogMode=[I,UB,UA])
-                     +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], changelogMode=[I,UB,UA])
-                        +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], changelogMode=[I,UB,UA])
-                           +- Exchange(distribution=[hash[c]], changelogMode=[I])
-                              +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], changelogMode=[I])
-                                 +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], changelogMode=[I])
-                                    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+            +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(distinct$0 b) AS count$0, COUNT_RETRACT(*) AS count1$1, DISTINCT(b) AS distinct$0], changelogMode=[I])
+               +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], changelogMode=[I,UB,UA])
+                  +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], changelogMode=[I,UB,UA])
+                     +- Exchange(distribution=[hash[c]], changelogMode=[I])
+                        +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], changelogMode=[I])
+                           +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], changelogMode=[I])
+                              +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
 ]]>
     </Resource>
   </TestCase>
@@ -1444,13 +1432,11 @@ LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
       <![CDATA[
 GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT($f2) AS $f1, $SUM0_RETRACT($f3_0) AS $f2])
 +- Exchange(distribution=[hash[a]])
-   +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f3_0])
+   +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) AS $f2, COUNT(DISTINCT b) AS $f3_0])
       +- Exchange(distribution=[hash[a, $f3]])
-         +- Calc(select=[a, b, c, $f3, ($e = 1) AS $g_1, ($e = 0) AS $g_0])
-            +- Expand(projects=[a, b, c, $f3, $e])
-               +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
-                  +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-                     +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
+            +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+               +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1472,12 +1458,10 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_R
    +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT($f2) AS listagg$0, $SUM0_RETRACT($f3_0) AS sum$1, COUNT_RETRACT(*) AS count1$2])
       +- GlobalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG((accDelimiter$0, concatAcc$1)) AS $f2, COUNT(distinct$0 count$2) AS $f3_0])
          +- Exchange(distribution=[hash[a, $f3]])
-            +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
-               +- Calc(select=[a, b, c, $f3, ($e = 1) AS $g_1, ($e = 0) AS $g_0])
-                  +- Expand(projects=[a, b, c, $f3, $e])
-                     +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
-                        +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-                           +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0])
+               +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
+                  +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+                     +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
index f8167c1..b46d2d6 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -41,12 +41,10 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
 +- Exchange(distribution=[hash[a]])
    +- IncrementalGroupAggregate(partialAggGrouping=[a, $f4], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, SUM(sum$1) AS sum$1, SUM(sum$2) AS sum$2])
       +- Exchange(distribution=[hash[a, $f4]])
-         +- LocalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, SUM(b) FILTER $g_10 AS sum$2, DISTINCT(b) AS distinct$0])
-            +- Calc(select=[a, b, $f2, $f3, $f4, (($e = 0) AND $f2) AS $g_0, (($e = 1) AND $f3) AS $g_1, (($e = 1) AND $f2) AS $g_10])
-               +- Expand(projects=[a, b, $f2, $f3, $f4, $e])
-                  +- Calc(select=[a, b, (b <> 2:BIGINT) IS TRUE AS $f2, (b <> 5:BIGINT) IS TRUE AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
-                     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-                        +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- LocalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 b) FILTER $f2 AS count$0, SUM(b) FILTER $f3 AS sum$1, SUM(b) FILTER $f2 AS sum$2, DISTINCT(b) AS distinct$0])
+            +- Calc(select=[a, b, (b <> 2:BIGINT) IS TRUE AS $f2, (b <> 5:BIGINT) IS TRUE AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+               +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+                  +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -291,12 +289,10 @@ Calc(select=[a, $f1, $f2, IF(($f4 = 0:BIGINT), null:BIGINT, ($f3 / $f4)) AS $f3]
    +- Exchange(distribution=[hash[a]])
       +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, SUM(sum$1) AS sum$1, $SUM0(sum$2) AS sum$2, COUNT(count$3) AS count$3])
          +- Exchange(distribution=[hash[a, $f2]])
-            +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, $SUM0(b) FILTER $g_1 AS sum$2, COUNT(b) FILTER $g_1 AS count$3, DISTINCT(b) AS distinct$0])
-               +- Calc(select=[a, b, $f2, ($e = 0) AS $g_0, ($e = 1) AS $g_1])
-                  +- Expand(projects=[a, b, $f2, $e])
-                     +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-                        +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-                           +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, SUM(b) AS sum$1, $SUM0(b) AS sum$2, COUNT(b) AS count$3, DISTINCT(b) AS distinct$0])
+               +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+                  +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+                     +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -322,19 +318,17 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT()])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT(count$0) AS $f1, $SUM0_RETRACT(count1$1) AS $f2], changelogMode=[I,UA,D])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1, $SUM0(count1$1) AS $f2], changelogMode=[I,UA,D])
 +- Exchange(distribution=[hash[a]], changelogMode=[I])
-   +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT_RETRACT(distinct$0 count$0) AS count$0, COUNT_RETRACT(count1$1) AS count1$1, COUNT_RETRACT(count1$2) AS count1$2], changelogMode=[I])
+   +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT_RETRACT(distinct$0 count$0) AS count$0, COUNT_RETRACT(count1$1) AS count1$1], changelogMode=[I])
       +- Exchange(distribution=[hash[a, $f2]], changelogMode=[I])
-         +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(distinct$0 b) FILTER $g_0 AS count$0, COUNT_RETRACT(*) FILTER $g_1 AS count1$1, COUNT_RETRACT(*) AS count1$2, DISTINCT(b) AS distinct$0], changelogMode=[I])
-            +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1], changelogMode=[I,UB,UA])
-               +- Expand(projects=[a, b, $f2, $e], changelogMode=[I,UB,UA])
-                  +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], changelogMode=[I,UB,UA])
-                     +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], changelogMode=[I,UB,UA])
-                        +- Exchange(distribution=[hash[c]], changelogMode=[I])
-                           +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], changelogMode=[I])
-                              +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], changelogMode=[I])
-                                 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+         +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT_RETRACT(distinct$0 b) AS count$0, COUNT_RETRACT(*) AS count1$1, DISTINCT(b) AS distinct$0], changelogMode=[I])
+            +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], changelogMode=[I,UB,UA])
+               +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], changelogMode=[I,UB,UA])
+                  +- Exchange(distribution=[hash[c]], changelogMode=[I])
+                     +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], changelogMode=[I])
+                        +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], changelogMode=[I])
+                           +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
 ]]>
     </Resource>
   </TestCase>
@@ -405,12 +399,10 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG((
 +- Exchange(distribution=[hash[a]])
    +- IncrementalGroupAggregate(partialAggGrouping=[a, $f3], finalAggGrouping=[a], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 count$2) AS count$2])
       +- Exchange(distribution=[hash[a, $f3]])
-         +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
-            +- Calc(select=[a, b, c, $f3, ($e = 1) AS $g_1, ($e = 0) AS $g_0])
-               +- Expand(projects=[a, b, c, $f3, $e])
-                  +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
-                     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-                        +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0])
+            +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
+               +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+                  +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
index 8f433c3..4dbce13 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.planner.utils.TableTestBase
 
 import org.junit.Test
 
-/**
+/**IncrementalAggregateTest
   * Test for [[SplitAggregateRule]].
   */
 class SplitAggregateRuleTest extends TableTestBase {
@@ -168,4 +168,22 @@ class SplitAggregateRuleTest extends TableTestBase {
     val sqlQuery = "SELECT COUNT(DISTINCT c) FROM MyTable"
     util.verifyRelPlan(sqlQuery)
   }
+
+  @Test
+  def testMultipleDistinctAggOnSameColumn(): Unit = {
+    util.tableEnv.getConfig.getConfiguration.setBoolean(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+    val sqlQuery =
+      s"""
+         |SELECT
+         |  a,
+         |  COUNT(DISTINCT b),
+         |  COUNT(DISTINCT b) FILTER(WHERE b <> 5),
+         |  SUM(b),
+         |  AVG(b)
+         |FROM MyTable
+         |GROUP BY a
+         |""".stripMargin
+    util.verifyRelPlan(sqlQuery)
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index c1c34c7..d799318 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -390,6 +390,28 @@ class SplitAggregateITCase(
     val expected = List("1,1,50", "1,ALL,50")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
+
+  @Test
+  def testMultipleDistinctAggOnSameColumn(): Unit = {
+    val t1 = tEnv.sqlQuery(
+      s"""
+         |SELECT
+         |  a,
+         |  COUNT(DISTINCT b),
+         |  COUNT(DISTINCT b) filter (where not b = 2),
+         |  MAX(b) filter (where not b = 5),
+         |  MIN(b) filter (where not b = 2)
+         |FROM T
+         |GROUP BY a
+       """.stripMargin)
+
+    val sink = new TestingRetractSink
+    t1.toRetractStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List("1,2,1,2,1", "2,4,3,4,3", "3,1,1,null,5", "4,2,2,6,5")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
 }
 
 object SplitAggregateITCase {