You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/31 02:15:26 UTC

[flink] 12/13: [FLINK-14338][table-planner-blink] Update files due to CALCITE-1824

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5fef3c1272e5be6ca20ed83f87c147a2d18379b0
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Tue Mar 17 21:34:17 2020 +0800

    [FLINK-14338][table-planner-blink] Update files due to CALCITE-1824
    
    * GROUP_ID translation was fixed
---
 .../logical/DecomposeGroupingSetsRuleTest.xml      | 61 ++++++++++------
 .../plan/stream/sql/agg/GroupingSetsTest.scala     |  3 +
 .../runtime/batch/sql/agg/GroupingSetsITCase.scala | 11 +--
 .../runtime/batch/sql/GroupingSetsITCase.java      | 83 +++++++++++++---------
 .../api/batch/sql/DistinctAggregateTest.scala      | 19 +----
 .../table/api/batch/sql/GroupingSetsTest.scala     | 47 ++++--------
 .../table/runtime/batch/sql/AggregateITCase.scala  | 31 +++++---
 7 files changed, 132 insertions(+), 123 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml
index ca3284e..d656c98 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml
@@ -24,16 +24,27 @@ SELECT a, GROUP_ID() AS g, COUNT(*) as c FROM MyTable GROUP BY GROUPING SETS (a,
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], groups=[[{0}, {}]], g=[GROUP_ID()], c=[COUNT()])
-+- LogicalProject(a=[$0])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalUnion(all=[true])
+:- LogicalProject(a=[$0], g=[0:BIGINT], c=[$1])
+:  +- LogicalAggregate(group=[{0}], groups=[[{0}, {}]], c=[COUNT()])
+:     +- LogicalProject(a=[$0])
+:        +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(a=[$0], g=[1:BIGINT], c=[$1])
+   +- LogicalAggregate(group=[{0}], groups=[[{}]], c=[COUNT()])
+      +- LogicalProject(a=[$0])
+         +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, 0:BIGINT AS g, c])
-+- FlinkLogicalAggregate(group=[{0, 1}], c=[COUNT()])
-   +- FlinkLogicalExpand(projects=[{a=[$0], $e=[0]}, {a=[null], $e=[1]}])
+FlinkLogicalUnion(all=[true])
+:- FlinkLogicalCalc(select=[a, 0:BIGINT AS g, c])
+:  +- FlinkLogicalAggregate(group=[{0, 1}], c=[COUNT()])
+:     +- FlinkLogicalExpand(projects=[a, $e])
+:        +- FlinkLogicalCalc(select=[a])
+:           +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- FlinkLogicalCalc(select=[a, 1:BIGINT AS g, c])
+   +- FlinkLogicalAggregate(group=[{0}], groups=[[{}]], c=[COUNT()])
       +- FlinkLogicalCalc(select=[a])
          +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -56,16 +67,17 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {1}, {}]], a=[AVG($2)], g=[GROUP_ID()], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)])
-+- LogicalProject(b=[$1], c=[$2], a=[$0])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalProject(b=[$0], c=[$1], a=[$2], g=[0:BIGINT], gb=[$3], gc=[$4], gib=[$5], gic=[$6], gid=[$7])
++- LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {1}, {}]], a=[AVG($2)], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)])
+   +- LogicalProject(b=[$1], c=[$2], a=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 FlinkLogicalCalc(select=[b, c, a, 0:BIGINT AS g, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gb, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 0:BIGINT, 1:BIGINT) AS gc, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gib, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 0:BIGINT, 1:BIGINT) AS gic, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT) [...]
 +- FlinkLogicalAggregate(group=[{1, 2, 3}], a=[AVG($0)])
-   +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[$2], $e=[0]}, {a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[$2], $e=[2]}, {a=[$0], b=[null], c=[null], $e=[3]}])
+   +- FlinkLogicalExpand(projects=[a, b, c, $e])
       +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -84,9 +96,10 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], a=[AVG($1)], g=[GROUP_ID()], gb=[GROUPING($0)], gib=[GROUPING_ID($0)])
-+- LogicalProject(b=[$1], a=[$0])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalProject(b=[$0], a=[$1], g=[0:BIGINT], gb=[$2], gib=[$3])
++- LogicalAggregate(group=[{0}], a=[AVG($1)], gb=[GROUPING($0)], gib=[GROUPING_ID($0)])
+   +- LogicalProject(b=[$1], a=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -107,16 +120,17 @@ GROUP BY GROUPING SETS (b, c)
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], a=[AVG($2)], g=[GROUP_ID()])
-+- LogicalProject(b=[$1], c=[$2], a=[$0])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalProject(b=[$0], c=[$1], a=[$2], g=[0:BIGINT])
++- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], a=[AVG($2)])
+   +- LogicalProject(b=[$1], c=[$2], a=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 FlinkLogicalCalc(select=[b, c, a, 0:BIGINT AS g])
 +- FlinkLogicalAggregate(group=[{1, 2, 3}], a=[AVG($0)])
-   +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[$2], $e=[2]}])
+   +- FlinkLogicalExpand(projects=[a, b, c, $e])
       +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -139,7 +153,7 @@ LogicalProject(a=[$3], b=[$4], c=[$5])
       <![CDATA[
 FlinkLogicalCalc(select=[a_0 AS a, b_0 AS b, c_0 AS c])
 +- FlinkLogicalAggregate(group=[{0, 1, 2, 3}], a=[COUNT($0)], b=[COUNT($4)], c=[COUNT($5)])
-   +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1], b_0=[$1], c_0=[$2]}, {a=[$0], b=[null], c=[$2], $e=[2], b_0=[$1], c_0=[$2]}])
+   +- FlinkLogicalExpand(projects=[a, b, c, $e, b_0, c_0])
       +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -161,16 +175,17 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {}]], a=[AVG($2)], g=[GROUP_ID()], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)])
-+- LogicalProject(b=[$1], c=[$2], a=[$0])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalProject(b=[$0], c=[$1], a=[$2], g=[0:BIGINT], gb=[$3], gc=[$4], gib=[$5], gic=[$6], gid=[$7])
++- LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {}]], a=[AVG($2)], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)])
+   +- LogicalProject(b=[$1], c=[$2], a=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 FlinkLogicalCalc(select=[b, c, a, 0:BIGINT AS g, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gb, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS gc, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gib, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS gic, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT) AS gid])
 +- FlinkLogicalAggregate(group=[{1, 2, 3}], a=[AVG($0)])
-   +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[$2], $e=[0]}, {a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[null], $e=[3]}])
+   +- FlinkLogicalExpand(projects=[a, b, c, $e])
       +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -193,7 +208,7 @@ LogicalProject(b=[$2], c=[$3])
       <![CDATA[
 FlinkLogicalCalc(select=[b_0 AS b, c_0 AS c])
 +- FlinkLogicalAggregate(group=[{0, 1, 2}], b=[COUNT($3)], c=[COUNT($4)])
-   +- FlinkLogicalExpand(projects=[{b=[$0], c=[null], $e=[1], b_0=[$0], c_0=[$1]}, {b=[null], c=[$1], $e=[2], b_0=[$0], c_0=[$1]}])
+   +- FlinkLogicalExpand(projects=[b, c, $e, b_0, c_0])
       +- FlinkLogicalCalc(select=[b, c])
          +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala
index b371e81..652b8a0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.stream.sql.agg
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
 import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil}
@@ -354,6 +355,8 @@ class GroupingSetsTest extends TableTestBase {
 
   @Test
   def testCALCITE1824(): Unit = {
+    expectedException.expect(classOf[TableException])
+    expectedException.expectMessage("GROUPING SETS are currently not supported")
     val sqlQuery =
       """
         |SELECT deptno, GROUP_ID() AS g, COUNT(*) AS c
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala
index 584d073..a9c1be6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala
@@ -395,13 +395,16 @@ class GroupingSetsITCase extends BatchTestBase {
 
   @Test
   def testCALCITE1824(): Unit = {
-    // TODO:
-    // When "[CALCITE-1824] GROUP_ID returns wrong result" is fixed,
-    // there will be an extra row (null, 1, 14).
     checkResult(
       "select deptno, group_id() as g, count(*) as c " +
         "from scott_emp group by grouping sets (deptno, (), ())",
-      Seq(row(10, 0, 3), row(20, 0, 5), row(30, 0, 6), row(null, 0, 14))
+      Seq(row(10, 0, 3),
+        row(10, 1, 3),
+        row(20, 0, 5),
+        row(20, 1, 5),
+        row(30, 0, 6),
+        row(30, 1, 6),
+        row(null, 0, 14))
     )
   }
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
index a9fd29b..c73eaff 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
@@ -89,33 +89,33 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
 				" GROUP BY GROUPING SETS (f1, f2, ())";
 
 		String expected =
-			"1,null,1,1,1,0,1,0,2,1\n" +
-			"6,null,18,1,1,0,1,0,2,6\n" +
-			"2,null,2,1,1,0,1,0,2,2\n" +
-			"4,null,8,1,1,0,1,0,2,4\n" +
-			"5,null,13,1,1,0,1,0,2,5\n" +
-			"3,null,5,1,1,0,1,0,2,3\n" +
-			"null,Comment#11,17,2,0,1,0,1,1,1\n" +
-			"null,Comment#8,14,2,0,1,0,1,1,1\n" +
-			"null,Comment#2,8,2,0,1,0,1,1,1\n" +
-			"null,Comment#1,7,2,0,1,0,1,1,1\n" +
-			"null,Comment#14,20,2,0,1,0,1,1,1\n" +
-			"null,Comment#7,13,2,0,1,0,1,1,1\n" +
-			"null,Comment#6,12,2,0,1,0,1,1,1\n" +
-			"null,Comment#3,9,2,0,1,0,1,1,1\n" +
-			"null,Comment#12,18,2,0,1,0,1,1,1\n" +
-			"null,Comment#5,11,2,0,1,0,1,1,1\n" +
-			"null,Comment#15,21,2,0,1,0,1,1,1\n" +
-			"null,Comment#4,10,2,0,1,0,1,1,1\n" +
-			"null,Hi,1,2,0,1,0,1,1,1\n" +
-			"null,Comment#10,16,2,0,1,0,1,1,1\n" +
-			"null,Hello world,3,2,0,1,0,1,1,1\n" +
-			"null,I am fine.,5,2,0,1,0,1,1,1\n" +
-			"null,Hello world, how are you?,4,2,0,1,0,1,1,1\n" +
-			"null,Comment#9,15,2,0,1,0,1,1,1\n" +
-			"null,Comment#13,19,2,0,1,0,1,1,1\n" +
-			"null,Luke Skywalker,6,2,0,1,0,1,1,1\n" +
-			"null,Hello,2,2,0,1,0,1,1,1\n" +
+			"1,null,1,0,1,0,1,0,2,1\n" +
+			"2,null,2,0,1,0,1,0,2,2\n" +
+			"3,null,5,0,1,0,1,0,2,3\n" +
+			"4,null,8,0,1,0,1,0,2,4\n" +
+			"5,null,13,0,1,0,1,0,2,5\n" +
+			"6,null,18,0,1,0,1,0,2,6\n" +
+			"null,Comment#1,7,0,0,1,0,1,1,1\n" +
+			"null,Comment#10,16,0,0,1,0,1,1,1\n" +
+			"null,Comment#11,17,0,0,1,0,1,1,1\n" +
+			"null,Comment#12,18,0,0,1,0,1,1,1\n" +
+			"null,Comment#13,19,0,0,1,0,1,1,1\n" +
+			"null,Comment#14,20,0,0,1,0,1,1,1\n" +
+			"null,Comment#15,21,0,0,1,0,1,1,1\n" +
+			"null,Comment#2,8,0,0,1,0,1,1,1\n" +
+			"null,Comment#3,9,0,0,1,0,1,1,1\n" +
+			"null,Comment#4,10,0,0,1,0,1,1,1\n" +
+			"null,Comment#5,11,0,0,1,0,1,1,1\n" +
+			"null,Comment#6,12,0,0,1,0,1,1,1\n" +
+			"null,Comment#7,13,0,0,1,0,1,1,1\n" +
+			"null,Comment#8,14,0,0,1,0,1,1,1\n" +
+			"null,Comment#9,15,0,0,1,0,1,1,1\n" +
+			"null,Hello world, how are you?,4,0,0,1,0,1,1,1\n" +
+			"null,Hello world,3,0,0,1,0,1,1,1\n" +
+			"null,Hello,2,0,0,1,0,1,1,1\n" +
+			"null,Hi,1,0,0,1,0,1,1,1\n" +
+			"null,I am fine.,5,0,0,1,0,1,1,1\n" +
+			"null,Luke Skywalker,6,0,0,1,0,1,1,1\n" +
 			"null,null,11,0,0,0,0,0,0,21";
 
 		checkSql(query, expected);
@@ -128,14 +128,27 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
 				" GROUP BY GROUPING SETS (f1, f2)";
 
 		String expected =
-			"6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" +
-				"null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" +
-				"null,null,3,2\nnull,Hello,2,2\nnull,Comment#9,15,2\nnull,Comment#8,14,2\n" +
-				"null,Comment#7,13,2\nnull,Comment#6,12,2\nnull,Comment#5,11,2\n" +
-				"null,Comment#4,10,2\nnull,Comment#3,9,2\nnull,Comment#2,8,2\n" +
-				"null,Comment#15,21,2\nnull,Comment#14,20,2\nnull,Comment#13,19,2\n" +
-				"null,Comment#12,18,2\nnull,Comment#11,17,2\nnull,Comment#10,16,2\n" +
-				"null,Comment#1,7,2";
+			"1,Hi,1,0\n" +
+			"2,Hello,2,0\n" +
+			"2,null,3,0\n" +
+			"3,I am fine.,5,0\n" +
+			"3,Luke Skywalker,6,0\n" +
+			"3,null,4,0\n" +
+			"4,Comment#1,7,0\n" +
+			"4,Comment#2,8,0\n" +
+			"4,Comment#3,9,0\n" +
+			"4,Comment#4,10,0\n" +
+			"5,Comment#5,11,0\n" +
+			"5,Comment#6,12,0\n" +
+			"5,Comment#7,13,0\n" +
+			"5,Comment#8,14,0\n" +
+			"5,Comment#9,15,0\n" +
+			"6,Comment#10,16,0\n" +
+			"6,Comment#11,17,0\n" +
+			"6,Comment#12,18,0\n" +
+			"6,Comment#13,19,0\n" +
+			"6,Comment#14,20,0\n" +
+			"6,Comment#15,21,0";
 
 		checkSql(query, expected);
 	}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala
index d3c2795..ff5e560 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala
@@ -57,13 +57,7 @@ class DistinctAggregateTest extends TableTestBase {
 
     val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable"
 
-    val left = unaryNode("DataSetAggregate",
-      unaryNode("DataSetCalc",
-        batchTableNode(table),
-        term("select", "a")),
-      term("select", "MAX(a) AS EXPR$2"))
-
-    val right = unaryNode(
+    val expected = unaryNode(
       "DataSetAggregate",
       unaryNode(
         "DataSetDistinct",
@@ -74,18 +68,9 @@ class DistinctAggregateTest extends TableTestBase {
         ),
         term("distinct", "a")
       ),
-      term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1")
+      term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2")
     )
 
-    val expected = unaryNode("DataSetCalc",
-      binaryNode("DataSetSingleRowJoin",
-        left,
-        right,
-        term("where", "true"),
-        term("join", "EXPR$2", "EXPR$0", "EXPR$1"),
-        term("joinType", "NestedLoopInnerJoin")),
-      term("select", "EXPR$0", "EXPR$1", "EXPR$2"))
-
     util.verifySql(sqlQuery, expected)
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
index 7b811aa..59faa2c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
@@ -34,38 +34,15 @@ class GroupingSetsTest extends TableTestBase {
     val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g FROM MyTable " +
       "GROUP BY GROUPING SETS (b, c)"
 
-    val aggregate = binaryNode(
-      "DataSetUnion",
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetCalc",
-            batchTableNode(table),
-            term("select", "b", "a")
-          ),
-          term("groupBy", "b"),
-          term("select", "b", "AVG(a) AS a")
-        ),
-        term("select", "b", "null:INTEGER AS c", "a", "1:BIGINT AS g")
-      ),
+    val aggregate = unaryNode(
+      "DataSetCalc",
       unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetCalc",
-            batchTableNode(table),
-            term("select", "c", "a")
-          ),
-          term("groupBy", "c"),
-          term("select", "c", "AVG(a) AS a")
-        ),
-        term("select", "null:BIGINT AS b", "c", "a", "2:BIGINT AS g")
+        "DataSetAggregate",
+        batchTableNode(table),
+        term("groupBy", "b", "c"),
+        term("select", "b", "c", "AVG(a) AS a")
       ),
-      term("all", "true"),
-      term("union", "b", "c", "a", "g")
+      term("select", "b", "c", "a", "0:BIGINT AS g")
     )
 
     util.verifySql(sqlQuery, aggregate)
@@ -91,7 +68,7 @@ class GroupingSetsTest extends TableTestBase {
         term("groupBy", "b", "c"),
         term("select", "b", "c", "AVG(a) AS a")
       ),
-      term("select", "b", "c", "a", "3:BIGINT AS g", "1:BIGINT AS gb", "1:BIGINT AS gc",
+      term("select", "b", "c", "a", "0:BIGINT AS g", "1:BIGINT AS gb", "1:BIGINT AS gc",
         "1:BIGINT AS gib", "1:BIGINT AS gic", "3:BIGINT AS gid")
     )
 
@@ -107,7 +84,7 @@ class GroupingSetsTest extends TableTestBase {
         term("groupBy", "b"),
         term("select", "b", "AVG(a) AS a")
       ),
-      term("select", "b", "null:INTEGER AS c", "a", "1:BIGINT AS g", "1:BIGINT AS gb",
+      term("select", "b", "null:INTEGER AS c", "a", "0:BIGINT AS g", "1:BIGINT AS gb",
         "0:BIGINT AS gc", "1:BIGINT AS gib", "0:BIGINT AS gic", "2:BIGINT AS gid")
     )
 
@@ -123,7 +100,7 @@ class GroupingSetsTest extends TableTestBase {
         term("groupBy", "c"),
         term("select", "c", "AVG(a) AS a")
       ),
-      term("select", "null:BIGINT AS b", "c", "a", "2:BIGINT AS g", "0:BIGINT AS gb",
+      term("select", "null:BIGINT AS b", "c", "a", "0:BIGINT AS g", "0:BIGINT AS gb",
         "1:BIGINT AS gc", "0:BIGINT AS gib", "1:BIGINT AS gic", "1:BIGINT AS gid")
     )
 
@@ -185,7 +162,7 @@ class GroupingSetsTest extends TableTestBase {
         term("groupBy", "b", "c"),
         term("select", "b", "c", "AVG(a) AS a")
       ),
-      term("select", "b", "c", "a", "3:BIGINT AS g", "1:BIGINT AS gb", "1:BIGINT AS gc",
+      term("select", "b", "c", "a", "0:BIGINT AS g", "1:BIGINT AS gb", "1:BIGINT AS gc",
         "1:BIGINT AS gib", "1:BIGINT AS gic", "3:BIGINT AS gid")
     )
 
@@ -201,7 +178,7 @@ class GroupingSetsTest extends TableTestBase {
         term("groupBy", "b"),
         term("select", "b", "AVG(a) AS a")
       ),
-      term("select", "b", "null:INTEGER AS c", "a", "1:BIGINT AS g", "1:BIGINT AS gb",
+      term("select", "b", "null:INTEGER AS c", "a", "0:BIGINT AS g", "1:BIGINT AS gb",
         "0:BIGINT AS gc", "1:BIGINT AS gib", "0:BIGINT AS gic", "2:BIGINT AS gid")
     )
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
index d2bc5aa..f7d0102 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
@@ -29,6 +29,7 @@ import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMod
 import org.apache.flink.table.utils.NonMergableCount
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
+
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -232,7 +233,6 @@ class AggregateITCase(
 
   @Test
   def testGroupingSetAggregate(): Unit = {
-
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = BatchTableEnvironment.create(env, config)
 
@@ -245,14 +245,27 @@ class AggregateITCase(
     val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
 
     val expected =
-      "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" +
-        "null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" +
-        "null,Hello world, how are you?,4,2\nnull,Hello world,3,2\nnull,Hello,2,2\n" +
-        "null,Comment#9,15,2\nnull,Comment#8,14,2\nnull,Comment#7,13,2\n" +
-        "null,Comment#6,12,2\nnull,Comment#5,11,2\nnull,Comment#4,10,2\n" +
-        "null,Comment#3,9,2\nnull,Comment#2,8,2\nnull,Comment#15,21,2\n" +
-        "null,Comment#14,20,2\nnull,Comment#13,19,2\nnull,Comment#12,18,2\n" +
-        "null,Comment#11,17,2\nnull,Comment#10,16,2\nnull,Comment#1,7,2"
+      "1,Hi,1,0\n" +
+        "2,Hello world,3,0\n" +
+        "2,Hello,2,0\n" +
+        "3,Hello world, how are you?,4,0\n" +
+        "3,I am fine.,5,0\n" +
+        "3,Luke Skywalker,6,0\n" +
+        "4,Comment#1,7,0\n" +
+        "4,Comment#2,8,0\n" +
+        "4,Comment#3,9,0\n" +
+        "4,Comment#4,10,0\n" +
+        "5,Comment#5,11,0\n" +
+        "5,Comment#6,12,0\n" +
+        "5,Comment#7,13,0\n" +
+        "5,Comment#8,14,0\n" +
+        "5,Comment#9,15,0\n" +
+        "6,Comment#10,16,0\n" +
+        "6,Comment#11,17,0\n" +
+        "6,Comment#12,18,0\n" +
+        "6,Comment#13,19,0\n" +
+        "6,Comment#14,20,0\n" +
+        "6,Comment#15,21,0"
 
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }