You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/12 02:00:10 UTC
[flink] branch master updated: [FLINK-13562][table-planner-blink]
Fix incorrect input type for local stream group aggregate in
FlinkRelMdColumnInterval
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0b65aea [FLINK-13562][table-planner-blink] Fix incorrect input type for local stream group aggregate in FlinkRelMdColumnInterval
0b65aea is described below
commit 0b65aeaede36f0bc706dfdb82f039352b15069e9
Author: godfreyhe <go...@163.com>
AuthorDate: Sat Aug 3 13:01:29 2019 +0800
[FLINK-13562][table-planner-blink] Fix incorrect input type for local stream group aggregate in FlinkRelMdColumnInterval
This closes #9346
---
.../plan/metadata/FlinkRelMdColumnInterval.scala | 2 +-
.../plan/stream/sql/agg/DistinctAggregateTest.xml | 153 +++++++++++++++++----
.../stream/sql/agg/IncrementalAggregateTest.xml | 26 ++++
.../stream/sql/agg/DistinctAggregateTest.scala | 5 +
4 files changed, 155 insertions(+), 31 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
index c512689..8984636 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
@@ -553,7 +553,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
case agg: StreamExecGlobalGroupAggregate
if agg.globalAggInfoList.getActualAggregateCalls.length > aggCallIndex =>
val aggCallIndexInLocalAgg = getAggCallIndexInLocalAgg(
- aggCallIndex, agg.globalAggInfoList.getActualAggregateCalls, agg.getInput.getRowType)
+ aggCallIndex, agg.globalAggInfoList.getActualAggregateCalls, agg.inputRowType)
if (aggCallIndexInLocalAgg != null) {
return fmq.getColumnInterval(agg.getInput, groupSet.length + aggCallIndexInLocalAgg)
} else {
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
index d2b8388..2542251 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -1716,29 +1716,6 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
]]>
</Resource>
</TestCase>
- <TestCase name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true, aggPhaseEnforcer=ONE_PHASE]">
- <Resource name="sql">
- <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
-+- LogicalProject(a=[$0], b=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f1) AS $f1, $SUM0_RETRACT($f2) AS $f2])
-+- Exchange(distribution=[hash[a]])
- +- GroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], select=[a, COUNT(DISTINCT a) AS $f1, COUNT(b) AS $f2])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, b])
- +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testSingleMaxWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
<Resource name="sql">
<![CDATA[
@@ -1769,6 +1746,27 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
]]>
</Resource>
</TestCase>
+ <TestCase name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=false, aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT a) AS EXPR$1, COUNT(b) AS EXPR$2])
++- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, b])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=false, aggPhaseEnforcer=TWO_PHASE]">
<Resource name="sql">
<![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
@@ -1791,6 +1789,54 @@ GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1
]]>
</Resource>
</TestCase>
+ <TestCase name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true, aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f1) AS $f1, $SUM0_RETRACT($f2) AS $f2])
++- Exchange(distribution=[hash[a]])
+ +- GroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], select=[a, COUNT(DISTINCT a) AS $f1, COUNT(b) AS $f2])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, b])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTwoDistinctAggregateWithNonDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM MyTable GROUP BY c]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($1)], EXPR$3=[COUNT(DISTINCT $2)])
++- LogicalProject(c=[$2], a=[$0], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, SUM_RETRACT($f3_0) AS $f1, SUM_RETRACT($f4_0) AS $f2, $SUM0_RETRACT($f5) AS $f3])
++- Exchange(distribution=[hash[c]])
+ +- GroupAggregate(groupBy=[c, $f3, $f4], partialFinalType=[PARTIAL], select=[c, $f3, $f4, SUM(DISTINCT a) FILTER $g_1 AS $f3_0, SUM(a) FILTER $g_3 AS $f4_0, COUNT(DISTINCT b) FILTER $g_2 AS $f5])
+ +- Exchange(distribution=[hash[c, $f3, $f4]])
+ +- Calc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 3) AS $g_3, =($e, 2) AS $g_2])
+ +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[null], $e=[3]}])
+ +- Calc(select=[a, b, c, MOD(HASH_CODE(a), 1024) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
<Resource name="sql">
<![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
@@ -1816,25 +1862,72 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
]]>
</Resource>
</TestCase>
- <TestCase name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=false, aggPhaseEnforcer=ONE_PHASE]">
+ <TestCase name="testTwoDistinctAggregateWithNonDistinctAgg[splitDistinctAggEnabled=false, aggPhaseEnforcer=TWO_PHASE]">
<Resource name="sql">
- <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
+ <![CDATA[SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM MyTable GROUP BY c]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
-+- LogicalProject(a=[$0], b=[$1])
+LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($1)], EXPR$3=[COUNT(DISTINCT $2)])
++- LogicalProject(c=[$2], a=[$0], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT a) AS EXPR$1, COUNT(b) AS EXPR$2])
-+- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, b])
+GlobalGroupAggregate(groupBy=[c], select=[c, SUM(distinct$0 sum$0) AS EXPR$1, SUM(sum$1) AS EXPR$2, COUNT(distinct$1 count$2) AS EXPR$3])
++- Exchange(distribution=[hash[c]])
+ +- LocalGroupAggregate(groupBy=[c], select=[c, SUM(distinct$0 a) AS sum$0, SUM(a) AS sum$1, COUNT(distinct$1 b) AS count$2, DISTINCT(a) AS distinct$0, DISTINCT(b) AS distinct$1])
+- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
+ <TestCase name="testTwoDistinctAggregateWithNonDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM MyTable GROUP BY c]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($1)], EXPR$3=[COUNT(DISTINCT $2)])
++- LogicalProject(c=[$2], a=[$0], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, SUM_RETRACT((sum$0, count$1)) AS $f1, SUM_RETRACT((sum$2, count$3)) AS $f2, $SUM0_RETRACT(sum$4) AS $f3])
++- Exchange(distribution=[hash[c]])
+ +- LocalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, SUM_RETRACT($f3_0) AS (sum$0, count$1), SUM_RETRACT($f4_0) AS (sum$2, count$3), $SUM0_RETRACT($f5) AS sum$4, COUNT_RETRACT(*) AS count1$5])
+ +- GlobalGroupAggregate(groupBy=[c, $f3, $f4], partialFinalType=[PARTIAL], select=[c, $f3, $f4, SUM(distinct$0 sum$0) AS $f3_0, SUM(sum$1) AS $f4_0, COUNT(distinct$1 count$2) AS $f5])
+ +- Exchange(distribution=[hash[c, $f3, $f4]])
+ +- LocalGroupAggregate(groupBy=[c, $f3, $f4], partialFinalType=[PARTIAL], select=[c, $f3, $f4, SUM(distinct$0 a) FILTER $g_1 AS sum$0, SUM(a) FILTER $g_3 AS sum$1, COUNT(distinct$1 b) FILTER $g_2 AS count$2, DISTINCT(a) AS distinct$0, DISTINCT(b) AS distinct$1])
+ +- Calc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 3) AS $g_3, =($e, 2) AS $g_2])
+ +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[null], $e=[3]}])
+ +- Calc(select=[a, b, c, MOD(HASH_CODE(a), 1024) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTwoDistinctAggregateWithNonDistinctAgg[splitDistinctAggEnabled=false, aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM MyTable GROUP BY c]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($1)], EXPR$3=[COUNT(DISTINCT $2)])
++- LogicalProject(c=[$2], a=[$0], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupAggregate(groupBy=[c], select=[c, SUM(DISTINCT a) AS EXPR$1, SUM(a) AS EXPR$2, COUNT(DISTINCT b) AS EXPR$3])
++- Exchange(distribution=[hash[c]])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
index 01dcc34..48376b2 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -507,4 +507,30 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
]]>
</Resource>
</TestCase>
+ <TestCase name="testTwoDistinctAggregateWithNonDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM MyTable GROUP BY c]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($1)], EXPR$3=[COUNT(DISTINCT $2)])
++- LogicalProject(c=[$2], a=[$0], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, SUM(sum$0) AS $f1, SUM(sum$1) AS $f2, $SUM0(count$2) AS $f3])
++- Exchange(distribution=[hash[c]])
+ +- IncrementalGroupAggregate(partialAggGrouping=[c, $f3, $f4], finalAggGrouping=[c], select=[c, SUM(distinct$0 sum$0) AS sum$0, SUM(sum$1) AS sum$1, COUNT(distinct$1 count$2) AS count$2])
+ +- Exchange(distribution=[hash[c, $f3, $f4]])
+ +- LocalGroupAggregate(groupBy=[c, $f3, $f4], partialFinalType=[PARTIAL], select=[c, $f3, $f4, SUM(distinct$0 a) FILTER $g_1 AS sum$0, SUM(a) FILTER $g_3 AS sum$1, COUNT(distinct$1 b) FILTER $g_2 AS count$2, DISTINCT(a) AS distinct$0, DISTINCT(b) AS distinct$1])
+ +- Calc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 3) AS $g_3, =($e, 2) AS $g_2])
+ +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[null], $e=[3]}])
+ +- Calc(select=[a, b, c, MOD(HASH_CODE(a), 1024) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+ +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index 5cbbab4..7fbf0be 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -103,6 +103,11 @@ class DistinctAggregateTest(
}
@Test
+ def testTwoDistinctAggregateWithNonDistinctAgg(): Unit = {
+ util.verifyPlan("SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM MyTable GROUP BY c")
+ }
+
+ @Test
def testSingleDistinctAggWithGroupBy(): Unit = {
util.verifyPlan("SELECT a, COUNT(DISTINCT c) FROM MyTable GROUP BY a")
}