You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/31 02:15:26 UTC
[flink] 12/13: [FLINK-14338][table-planner-blink] Update files due
to CALCITE-1824
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5fef3c1272e5be6ca20ed83f87c147a2d18379b0
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Tue Mar 17 21:34:17 2020 +0800
[FLINK-14338][table-planner-blink] Update files due to CALCITE-1824
* GROUP_ID translation was fixed
---
.../logical/DecomposeGroupingSetsRuleTest.xml | 61 ++++++++++------
.../plan/stream/sql/agg/GroupingSetsTest.scala | 3 +
.../runtime/batch/sql/agg/GroupingSetsITCase.scala | 11 +--
.../runtime/batch/sql/GroupingSetsITCase.java | 83 +++++++++++++---------
.../api/batch/sql/DistinctAggregateTest.scala | 19 +----
.../table/api/batch/sql/GroupingSetsTest.scala | 47 ++++--------
.../table/runtime/batch/sql/AggregateITCase.scala | 31 +++++---
7 files changed, 132 insertions(+), 123 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml
index ca3284e..d656c98 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml
@@ -24,16 +24,27 @@ SELECT a, GROUP_ID() AS g, COUNT(*) as c FROM MyTable GROUP BY GROUPING SETS (a,
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalAggregate(group=[{0}], groups=[[{0}, {}]], g=[GROUP_ID()], c=[COUNT()])
-+- LogicalProject(a=[$0])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalUnion(all=[true])
+:- LogicalProject(a=[$0], g=[0:BIGINT], c=[$1])
+: +- LogicalAggregate(group=[{0}], groups=[[{0}, {}]], c=[COUNT()])
+: +- LogicalProject(a=[$0])
+: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(a=[$0], g=[1:BIGINT], c=[$1])
+ +- LogicalAggregate(group=[{0}], groups=[[{}]], c=[COUNT()])
+ +- LogicalProject(a=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-FlinkLogicalCalc(select=[a, 0:BIGINT AS g, c])
-+- FlinkLogicalAggregate(group=[{0, 1}], c=[COUNT()])
- +- FlinkLogicalExpand(projects=[{a=[$0], $e=[0]}, {a=[null], $e=[1]}])
+FlinkLogicalUnion(all=[true])
+:- FlinkLogicalCalc(select=[a, 0:BIGINT AS g, c])
+: +- FlinkLogicalAggregate(group=[{0, 1}], c=[COUNT()])
+: +- FlinkLogicalExpand(projects=[a, $e])
+: +- FlinkLogicalCalc(select=[a])
+: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- FlinkLogicalCalc(select=[a, 1:BIGINT AS g, c])
+ +- FlinkLogicalAggregate(group=[{0}], groups=[[{}]], c=[COUNT()])
+- FlinkLogicalCalc(select=[a])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
@@ -56,16 +67,17 @@ FROM MyTable
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {1}, {}]], a=[AVG($2)], g=[GROUP_ID()], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)])
-+- LogicalProject(b=[$1], c=[$2], a=[$0])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalProject(b=[$0], c=[$1], a=[$2], g=[0:BIGINT], gb=[$3], gc=[$4], gib=[$5], gic=[$6], gid=[$7])
++- LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {1}, {}]], a=[AVG($2)], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)])
+ +- LogicalProject(b=[$1], c=[$2], a=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
FlinkLogicalCalc(select=[b, c, a, 0:BIGINT AS g, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gb, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 0:BIGINT, 1:BIGINT) AS gc, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gib, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 0:BIGINT, 1:BIGINT) AS gic, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT) [...]
+- FlinkLogicalAggregate(group=[{1, 2, 3}], a=[AVG($0)])
- +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[$2], $e=[0]}, {a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[$2], $e=[2]}, {a=[$0], b=[null], c=[null], $e=[3]}])
+ +- FlinkLogicalExpand(projects=[a, b, c, $e])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -84,9 +96,10 @@ FROM MyTable
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalAggregate(group=[{0}], a=[AVG($1)], g=[GROUP_ID()], gb=[GROUPING($0)], gib=[GROUPING_ID($0)])
-+- LogicalProject(b=[$1], a=[$0])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalProject(b=[$0], a=[$1], g=[0:BIGINT], gb=[$2], gib=[$3])
++- LogicalAggregate(group=[{0}], a=[AVG($1)], gb=[GROUPING($0)], gib=[GROUPING_ID($0)])
+ +- LogicalProject(b=[$1], a=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
@@ -107,16 +120,17 @@ GROUP BY GROUPING SETS (b, c)
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], a=[AVG($2)], g=[GROUP_ID()])
-+- LogicalProject(b=[$1], c=[$2], a=[$0])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalProject(b=[$0], c=[$1], a=[$2], g=[0:BIGINT])
++- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], a=[AVG($2)])
+ +- LogicalProject(b=[$1], c=[$2], a=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
FlinkLogicalCalc(select=[b, c, a, 0:BIGINT AS g])
+- FlinkLogicalAggregate(group=[{1, 2, 3}], a=[AVG($0)])
- +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[$2], $e=[2]}])
+ +- FlinkLogicalExpand(projects=[a, b, c, $e])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -139,7 +153,7 @@ LogicalProject(a=[$3], b=[$4], c=[$5])
<![CDATA[
FlinkLogicalCalc(select=[a_0 AS a, b_0 AS b, c_0 AS c])
+- FlinkLogicalAggregate(group=[{0, 1, 2, 3}], a=[COUNT($0)], b=[COUNT($4)], c=[COUNT($5)])
- +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1], b_0=[$1], c_0=[$2]}, {a=[$0], b=[null], c=[$2], $e=[2], b_0=[$1], c_0=[$2]}])
+ +- FlinkLogicalExpand(projects=[a, b, c, $e, b_0, c_0])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -161,16 +175,17 @@ FROM MyTable
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {}]], a=[AVG($2)], g=[GROUP_ID()], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)])
-+- LogicalProject(b=[$1], c=[$2], a=[$0])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalProject(b=[$0], c=[$1], a=[$2], g=[0:BIGINT], gb=[$3], gc=[$4], gib=[$5], gic=[$6], gid=[$7])
++- LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {}]], a=[AVG($2)], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)])
+ +- LogicalProject(b=[$1], c=[$2], a=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
FlinkLogicalCalc(select=[b, c, a, 0:BIGINT AS g, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gb, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS gc, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gib, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS gic, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT) AS gid])
+- FlinkLogicalAggregate(group=[{1, 2, 3}], a=[AVG($0)])
- +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[$2], $e=[0]}, {a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[null], $e=[3]}])
+ +- FlinkLogicalExpand(projects=[a, b, c, $e])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -193,7 +208,7 @@ LogicalProject(b=[$2], c=[$3])
<![CDATA[
FlinkLogicalCalc(select=[b_0 AS b, c_0 AS c])
+- FlinkLogicalAggregate(group=[{0, 1, 2}], b=[COUNT($3)], c=[COUNT($4)])
- +- FlinkLogicalExpand(projects=[{b=[$0], c=[null], $e=[1], b_0=[$0], c_0=[$1]}, {b=[null], c=[$1], $e=[2], b_0=[$0], c_0=[$1]}])
+ +- FlinkLogicalExpand(projects=[b, c, $e, b_0, c_0])
+- FlinkLogicalCalc(select=[b, c])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala
index b371e81..652b8a0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.stream.sql.agg
import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
import org.apache.flink.table.api.scala._
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil}
@@ -354,6 +355,8 @@ class GroupingSetsTest extends TableTestBase {
@Test
def testCALCITE1824(): Unit = {
+ expectedException.expect(classOf[TableException])
+ expectedException.expectMessage("GROUPING SETS are currently not supported")
val sqlQuery =
"""
|SELECT deptno, GROUP_ID() AS g, COUNT(*) AS c
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala
index 584d073..a9c1be6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala
@@ -395,13 +395,16 @@ class GroupingSetsITCase extends BatchTestBase {
@Test
def testCALCITE1824(): Unit = {
- // TODO:
- // When "[CALCITE-1824] GROUP_ID returns wrong result" is fixed,
- // there will be an extra row (null, 1, 14).
checkResult(
"select deptno, group_id() as g, count(*) as c " +
"from scott_emp group by grouping sets (deptno, (), ())",
- Seq(row(10, 0, 3), row(20, 0, 5), row(30, 0, 6), row(null, 0, 14))
+ Seq(row(10, 0, 3),
+ row(10, 1, 3),
+ row(20, 0, 5),
+ row(20, 1, 5),
+ row(30, 0, 6),
+ row(30, 1, 6),
+ row(null, 0, 14))
)
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
index a9fd29b..c73eaff 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
@@ -89,33 +89,33 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
" GROUP BY GROUPING SETS (f1, f2, ())";
String expected =
- "1,null,1,1,1,0,1,0,2,1\n" +
- "6,null,18,1,1,0,1,0,2,6\n" +
- "2,null,2,1,1,0,1,0,2,2\n" +
- "4,null,8,1,1,0,1,0,2,4\n" +
- "5,null,13,1,1,0,1,0,2,5\n" +
- "3,null,5,1,1,0,1,0,2,3\n" +
- "null,Comment#11,17,2,0,1,0,1,1,1\n" +
- "null,Comment#8,14,2,0,1,0,1,1,1\n" +
- "null,Comment#2,8,2,0,1,0,1,1,1\n" +
- "null,Comment#1,7,2,0,1,0,1,1,1\n" +
- "null,Comment#14,20,2,0,1,0,1,1,1\n" +
- "null,Comment#7,13,2,0,1,0,1,1,1\n" +
- "null,Comment#6,12,2,0,1,0,1,1,1\n" +
- "null,Comment#3,9,2,0,1,0,1,1,1\n" +
- "null,Comment#12,18,2,0,1,0,1,1,1\n" +
- "null,Comment#5,11,2,0,1,0,1,1,1\n" +
- "null,Comment#15,21,2,0,1,0,1,1,1\n" +
- "null,Comment#4,10,2,0,1,0,1,1,1\n" +
- "null,Hi,1,2,0,1,0,1,1,1\n" +
- "null,Comment#10,16,2,0,1,0,1,1,1\n" +
- "null,Hello world,3,2,0,1,0,1,1,1\n" +
- "null,I am fine.,5,2,0,1,0,1,1,1\n" +
- "null,Hello world, how are you?,4,2,0,1,0,1,1,1\n" +
- "null,Comment#9,15,2,0,1,0,1,1,1\n" +
- "null,Comment#13,19,2,0,1,0,1,1,1\n" +
- "null,Luke Skywalker,6,2,0,1,0,1,1,1\n" +
- "null,Hello,2,2,0,1,0,1,1,1\n" +
+ "1,null,1,0,1,0,1,0,2,1\n" +
+ "2,null,2,0,1,0,1,0,2,2\n" +
+ "3,null,5,0,1,0,1,0,2,3\n" +
+ "4,null,8,0,1,0,1,0,2,4\n" +
+ "5,null,13,0,1,0,1,0,2,5\n" +
+ "6,null,18,0,1,0,1,0,2,6\n" +
+ "null,Comment#1,7,0,0,1,0,1,1,1\n" +
+ "null,Comment#10,16,0,0,1,0,1,1,1\n" +
+ "null,Comment#11,17,0,0,1,0,1,1,1\n" +
+ "null,Comment#12,18,0,0,1,0,1,1,1\n" +
+ "null,Comment#13,19,0,0,1,0,1,1,1\n" +
+ "null,Comment#14,20,0,0,1,0,1,1,1\n" +
+ "null,Comment#15,21,0,0,1,0,1,1,1\n" +
+ "null,Comment#2,8,0,0,1,0,1,1,1\n" +
+ "null,Comment#3,9,0,0,1,0,1,1,1\n" +
+ "null,Comment#4,10,0,0,1,0,1,1,1\n" +
+ "null,Comment#5,11,0,0,1,0,1,1,1\n" +
+ "null,Comment#6,12,0,0,1,0,1,1,1\n" +
+ "null,Comment#7,13,0,0,1,0,1,1,1\n" +
+ "null,Comment#8,14,0,0,1,0,1,1,1\n" +
+ "null,Comment#9,15,0,0,1,0,1,1,1\n" +
+ "null,Hello world, how are you?,4,0,0,1,0,1,1,1\n" +
+ "null,Hello world,3,0,0,1,0,1,1,1\n" +
+ "null,Hello,2,0,0,1,0,1,1,1\n" +
+ "null,Hi,1,0,0,1,0,1,1,1\n" +
+ "null,I am fine.,5,0,0,1,0,1,1,1\n" +
+ "null,Luke Skywalker,6,0,0,1,0,1,1,1\n" +
"null,null,11,0,0,0,0,0,0,21";
checkSql(query, expected);
@@ -128,14 +128,27 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
" GROUP BY GROUPING SETS (f1, f2)";
String expected =
- "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" +
- "null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" +
- "null,null,3,2\nnull,Hello,2,2\nnull,Comment#9,15,2\nnull,Comment#8,14,2\n" +
- "null,Comment#7,13,2\nnull,Comment#6,12,2\nnull,Comment#5,11,2\n" +
- "null,Comment#4,10,2\nnull,Comment#3,9,2\nnull,Comment#2,8,2\n" +
- "null,Comment#15,21,2\nnull,Comment#14,20,2\nnull,Comment#13,19,2\n" +
- "null,Comment#12,18,2\nnull,Comment#11,17,2\nnull,Comment#10,16,2\n" +
- "null,Comment#1,7,2";
+ "1,Hi,1,0\n" +
+ "2,Hello,2,0\n" +
+ "2,null,3,0\n" +
+ "3,I am fine.,5,0\n" +
+ "3,Luke Skywalker,6,0\n" +
+ "3,null,4,0\n" +
+ "4,Comment#1,7,0\n" +
+ "4,Comment#2,8,0\n" +
+ "4,Comment#3,9,0\n" +
+ "4,Comment#4,10,0\n" +
+ "5,Comment#5,11,0\n" +
+ "5,Comment#6,12,0\n" +
+ "5,Comment#7,13,0\n" +
+ "5,Comment#8,14,0\n" +
+ "5,Comment#9,15,0\n" +
+ "6,Comment#10,16,0\n" +
+ "6,Comment#11,17,0\n" +
+ "6,Comment#12,18,0\n" +
+ "6,Comment#13,19,0\n" +
+ "6,Comment#14,20,0\n" +
+ "6,Comment#15,21,0";
checkSql(query, expected);
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala
index d3c2795..ff5e560 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala
@@ -57,13 +57,7 @@ class DistinctAggregateTest extends TableTestBase {
val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable"
- val left = unaryNode("DataSetAggregate",
- unaryNode("DataSetCalc",
- batchTableNode(table),
- term("select", "a")),
- term("select", "MAX(a) AS EXPR$2"))
-
- val right = unaryNode(
+ val expected = unaryNode(
"DataSetAggregate",
unaryNode(
"DataSetDistinct",
@@ -74,18 +68,9 @@ class DistinctAggregateTest extends TableTestBase {
),
term("distinct", "a")
),
- term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1")
+ term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2")
)
- val expected = unaryNode("DataSetCalc",
- binaryNode("DataSetSingleRowJoin",
- left,
- right,
- term("where", "true"),
- term("join", "EXPR$2", "EXPR$0", "EXPR$1"),
- term("joinType", "NestedLoopInnerJoin")),
- term("select", "EXPR$0", "EXPR$1", "EXPR$2"))
-
util.verifySql(sqlQuery, expected)
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
index 7b811aa..59faa2c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
@@ -34,38 +34,15 @@ class GroupingSetsTest extends TableTestBase {
val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g FROM MyTable " +
"GROUP BY GROUPING SETS (b, c)"
- val aggregate = binaryNode(
- "DataSetUnion",
- unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(table),
- term("select", "b", "a")
- ),
- term("groupBy", "b"),
- term("select", "b", "AVG(a) AS a")
- ),
- term("select", "b", "null:INTEGER AS c", "a", "1:BIGINT AS g")
- ),
+ val aggregate = unaryNode(
+ "DataSetCalc",
unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(table),
- term("select", "c", "a")
- ),
- term("groupBy", "c"),
- term("select", "c", "AVG(a) AS a")
- ),
- term("select", "null:BIGINT AS b", "c", "a", "2:BIGINT AS g")
+ "DataSetAggregate",
+ batchTableNode(table),
+ term("groupBy", "b", "c"),
+ term("select", "b", "c", "AVG(a) AS a")
),
- term("all", "true"),
- term("union", "b", "c", "a", "g")
+ term("select", "b", "c", "a", "0:BIGINT AS g")
)
util.verifySql(sqlQuery, aggregate)
@@ -91,7 +68,7 @@ class GroupingSetsTest extends TableTestBase {
term("groupBy", "b", "c"),
term("select", "b", "c", "AVG(a) AS a")
),
- term("select", "b", "c", "a", "3:BIGINT AS g", "1:BIGINT AS gb", "1:BIGINT AS gc",
+ term("select", "b", "c", "a", "0:BIGINT AS g", "1:BIGINT AS gb", "1:BIGINT AS gc",
"1:BIGINT AS gib", "1:BIGINT AS gic", "3:BIGINT AS gid")
)
@@ -107,7 +84,7 @@ class GroupingSetsTest extends TableTestBase {
term("groupBy", "b"),
term("select", "b", "AVG(a) AS a")
),
- term("select", "b", "null:INTEGER AS c", "a", "1:BIGINT AS g", "1:BIGINT AS gb",
+ term("select", "b", "null:INTEGER AS c", "a", "0:BIGINT AS g", "1:BIGINT AS gb",
"0:BIGINT AS gc", "1:BIGINT AS gib", "0:BIGINT AS gic", "2:BIGINT AS gid")
)
@@ -123,7 +100,7 @@ class GroupingSetsTest extends TableTestBase {
term("groupBy", "c"),
term("select", "c", "AVG(a) AS a")
),
- term("select", "null:BIGINT AS b", "c", "a", "2:BIGINT AS g", "0:BIGINT AS gb",
+ term("select", "null:BIGINT AS b", "c", "a", "0:BIGINT AS g", "0:BIGINT AS gb",
"1:BIGINT AS gc", "0:BIGINT AS gib", "1:BIGINT AS gic", "1:BIGINT AS gid")
)
@@ -185,7 +162,7 @@ class GroupingSetsTest extends TableTestBase {
term("groupBy", "b", "c"),
term("select", "b", "c", "AVG(a) AS a")
),
- term("select", "b", "c", "a", "3:BIGINT AS g", "1:BIGINT AS gb", "1:BIGINT AS gc",
+ term("select", "b", "c", "a", "0:BIGINT AS g", "1:BIGINT AS gb", "1:BIGINT AS gc",
"1:BIGINT AS gib", "1:BIGINT AS gic", "3:BIGINT AS gid")
)
@@ -201,7 +178,7 @@ class GroupingSetsTest extends TableTestBase {
term("groupBy", "b"),
term("select", "b", "AVG(a) AS a")
),
- term("select", "b", "null:INTEGER AS c", "a", "1:BIGINT AS g", "1:BIGINT AS gb",
+ term("select", "b", "null:INTEGER AS c", "a", "0:BIGINT AS g", "1:BIGINT AS gb",
"0:BIGINT AS gc", "1:BIGINT AS gib", "0:BIGINT AS gic", "2:BIGINT AS gid")
)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
index d2bc5aa..f7d0102 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
@@ -29,6 +29,7 @@ import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMod
import org.apache.flink.table.utils.NonMergableCount
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
+
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -232,7 +233,6 @@ class AggregateITCase(
@Test
def testGroupingSetAggregate(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env, config)
@@ -245,14 +245,27 @@ class AggregateITCase(
val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
val expected =
- "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" +
- "null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" +
- "null,Hello world, how are you?,4,2\nnull,Hello world,3,2\nnull,Hello,2,2\n" +
- "null,Comment#9,15,2\nnull,Comment#8,14,2\nnull,Comment#7,13,2\n" +
- "null,Comment#6,12,2\nnull,Comment#5,11,2\nnull,Comment#4,10,2\n" +
- "null,Comment#3,9,2\nnull,Comment#2,8,2\nnull,Comment#15,21,2\n" +
- "null,Comment#14,20,2\nnull,Comment#13,19,2\nnull,Comment#12,18,2\n" +
- "null,Comment#11,17,2\nnull,Comment#10,16,2\nnull,Comment#1,7,2"
+ "1,Hi,1,0\n" +
+ "2,Hello world,3,0\n" +
+ "2,Hello,2,0\n" +
+ "3,Hello world, how are you?,4,0\n" +
+ "3,I am fine.,5,0\n" +
+ "3,Luke Skywalker,6,0\n" +
+ "4,Comment#1,7,0\n" +
+ "4,Comment#2,8,0\n" +
+ "4,Comment#3,9,0\n" +
+ "4,Comment#4,10,0\n" +
+ "5,Comment#5,11,0\n" +
+ "5,Comment#6,12,0\n" +
+ "5,Comment#7,13,0\n" +
+ "5,Comment#8,14,0\n" +
+ "5,Comment#9,15,0\n" +
+ "6,Comment#10,16,0\n" +
+ "6,Comment#11,17,0\n" +
+ "6,Comment#12,18,0\n" +
+ "6,Comment#13,19,0\n" +
+ "6,Comment#14,20,0\n" +
+ "6,Comment#15,21,0"
TestBaseUtils.compareResultAsText(result.asJava, expected)
}