You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/12 02:00:10 UTC

[flink] branch master updated: [FLINK-13562][table-planner-blink] Fix incorrect input type for local stream group aggregate in FlinkRelMdColumnInterval

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b65aea  [FLINK-13562][table-planner-blink] Fix incorrect input type for local stream group aggregate in FlinkRelMdColumnInterval
0b65aea is described below

commit 0b65aeaede36f0bc706dfdb82f039352b15069e9
Author: godfreyhe <go...@163.com>
AuthorDate: Sat Aug 3 13:01:29 2019 +0800

    [FLINK-13562][table-planner-blink] Fix incorrect input type for local stream group aggregate in FlinkRelMdColumnInterval
    
    This closes #9346
---
 .../plan/metadata/FlinkRelMdColumnInterval.scala   |   2 +-
 .../plan/stream/sql/agg/DistinctAggregateTest.xml  | 153 +++++++++++++++++----
 .../stream/sql/agg/IncrementalAggregateTest.xml    |  26 ++++
 .../stream/sql/agg/DistinctAggregateTest.scala     |   5 +
 4 files changed, 155 insertions(+), 31 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
index c512689..8984636 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
@@ -553,7 +553,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
           case agg: StreamExecGlobalGroupAggregate
             if agg.globalAggInfoList.getActualAggregateCalls.length > aggCallIndex =>
             val aggCallIndexInLocalAgg = getAggCallIndexInLocalAgg(
-              aggCallIndex, agg.globalAggInfoList.getActualAggregateCalls, agg.getInput.getRowType)
+              aggCallIndex, agg.globalAggInfoList.getActualAggregateCalls, agg.inputRowType)
             if (aggCallIndexInLocalAgg != null) {
               return fmq.getColumnInterval(agg.getInput, groupSet.length + aggCallIndexInLocalAgg)
             } else {
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
index d2b8388..2542251 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -1716,29 +1716,6 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true, aggPhaseEnforcer=ONE_PHASE]">
-    <Resource name="sql">
-      <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
-+- LogicalProject(a=[$0], b=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f1) AS $f1, $SUM0_RETRACT($f2) AS $f2])
-+- Exchange(distribution=[hash[a]])
-   +- GroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], select=[a, COUNT(DISTINCT a) AS $f1, COUNT(b) AS $f2])
-      +- Exchange(distribution=[hash[a]])
-         +- Calc(select=[a, b])
-            +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
-               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testSingleMaxWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
       <![CDATA[
@@ -1769,6 +1746,27 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=false, aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT a) AS EXPR$1, COUNT(b) AS EXPR$2])
++- Exchange(distribution=[hash[a]])
+   +- Calc(select=[a, b])
+      +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=false, aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
       <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
@@ -1791,6 +1789,54 @@ GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true, aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f1) AS $f1, $SUM0_RETRACT($f2) AS $f2])
++- Exchange(distribution=[hash[a]])
+   +- GroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], select=[a, COUNT(DISTINCT a) AS $f1, COUNT(b) AS $f2])
+      +- Exchange(distribution=[hash[a]])
+         +- Calc(select=[a, b])
+            +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTwoDistinctAggregateWithNonDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM MyTable GROUP BY c]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($1)], EXPR$3=[COUNT(DISTINCT $2)])
++- LogicalProject(c=[$2], a=[$0], b=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, SUM_RETRACT($f3_0) AS $f1, SUM_RETRACT($f4_0) AS $f2, $SUM0_RETRACT($f5) AS $f3])
++- Exchange(distribution=[hash[c]])
+   +- GroupAggregate(groupBy=[c, $f3, $f4], partialFinalType=[PARTIAL], select=[c, $f3, $f4, SUM(DISTINCT a) FILTER $g_1 AS $f3_0, SUM(a) FILTER $g_3 AS $f4_0, COUNT(DISTINCT b) FILTER $g_2 AS $f5])
+      +- Exchange(distribution=[hash[c, $f3, $f4]])
+         +- Calc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 3) AS $g_3, =($e, 2) AS $g_2])
+            +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[null], $e=[3]}])
+               +- Calc(select=[a, b, c, MOD(HASH_CODE(a), 1024) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
       <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
@@ -1816,25 +1862,72 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=false, aggPhaseEnforcer=ONE_PHASE]">
+  <TestCase name="testTwoDistinctAggregateWithNonDistinctAgg[splitDistinctAggEnabled=false, aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
+      <![CDATA[SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM MyTable GROUP BY c]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
-+- LogicalProject(a=[$0], b=[$1])
+LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($1)], EXPR$3=[COUNT(DISTINCT $2)])
++- LogicalProject(c=[$2], a=[$0], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT a) AS EXPR$1, COUNT(b) AS EXPR$2])
-+- Exchange(distribution=[hash[a]])
-   +- Calc(select=[a, b])
+GlobalGroupAggregate(groupBy=[c], select=[c, SUM(distinct$0 sum$0) AS EXPR$1, SUM(sum$1) AS EXPR$2, COUNT(distinct$1 count$2) AS EXPR$3])
++- Exchange(distribution=[hash[c]])
+   +- LocalGroupAggregate(groupBy=[c], select=[c, SUM(distinct$0 a) AS sum$0, SUM(a) AS sum$1, COUNT(distinct$1 b) AS count$2, DISTINCT(a) AS distinct$0, DISTINCT(b) AS distinct$1])
       +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testTwoDistinctAggregateWithNonDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM MyTable GROUP BY c]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($1)], EXPR$3=[COUNT(DISTINCT $2)])
++- LogicalProject(c=[$2], a=[$0], b=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, SUM_RETRACT((sum$0, count$1)) AS $f1, SUM_RETRACT((sum$2, count$3)) AS $f2, $SUM0_RETRACT(sum$4) AS $f3])
++- Exchange(distribution=[hash[c]])
+   +- LocalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, SUM_RETRACT($f3_0) AS (sum$0, count$1), SUM_RETRACT($f4_0) AS (sum$2, count$3), $SUM0_RETRACT($f5) AS sum$4, COUNT_RETRACT(*) AS count1$5])
+      +- GlobalGroupAggregate(groupBy=[c, $f3, $f4], partialFinalType=[PARTIAL], select=[c, $f3, $f4, SUM(distinct$0 sum$0) AS $f3_0, SUM(sum$1) AS $f4_0, COUNT(distinct$1 count$2) AS $f5])
+         +- Exchange(distribution=[hash[c, $f3, $f4]])
+            +- LocalGroupAggregate(groupBy=[c, $f3, $f4], partialFinalType=[PARTIAL], select=[c, $f3, $f4, SUM(distinct$0 a) FILTER $g_1 AS sum$0, SUM(a) FILTER $g_3 AS sum$1, COUNT(distinct$1 b) FILTER $g_2 AS count$2, DISTINCT(a) AS distinct$0, DISTINCT(b) AS distinct$1])
+               +- Calc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 3) AS $g_3, =($e, 2) AS $g_2])
+                  +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[null], $e=[3]}])
+                     +- Calc(select=[a, b, c, MOD(HASH_CODE(a), 1024) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+                        +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTwoDistinctAggregateWithNonDistinctAgg[splitDistinctAggEnabled=false, aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM MyTable GROUP BY c]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($1)], EXPR$3=[COUNT(DISTINCT $2)])
++- LogicalProject(c=[$2], a=[$0], b=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupAggregate(groupBy=[c], select=[c, SUM(DISTINCT a) AS EXPR$1, SUM(a) AS EXPR$2, COUNT(DISTINCT b) AS EXPR$3])
++- Exchange(distribution=[hash[c]])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
index 01dcc34..48376b2 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -507,4 +507,30 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testTwoDistinctAggregateWithNonDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM MyTable GROUP BY c]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($1)], EXPR$3=[COUNT(DISTINCT $2)])
++- LogicalProject(c=[$2], a=[$0], b=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, SUM(sum$0) AS $f1, SUM(sum$1) AS $f2, $SUM0(count$2) AS $f3])
++- Exchange(distribution=[hash[c]])
+   +- IncrementalGroupAggregate(partialAggGrouping=[c, $f3, $f4], finalAggGrouping=[c], select=[c, SUM(distinct$0 sum$0) AS sum$0, SUM(sum$1) AS sum$1, COUNT(distinct$1 count$2) AS count$2])
+      +- Exchange(distribution=[hash[c, $f3, $f4]])
+         +- LocalGroupAggregate(groupBy=[c, $f3, $f4], partialFinalType=[PARTIAL], select=[c, $f3, $f4, SUM(distinct$0 a) FILTER $g_1 AS sum$0, SUM(a) FILTER $g_3 AS sum$1, COUNT(distinct$1 b) FILTER $g_2 AS count$2, DISTINCT(a) AS distinct$0, DISTINCT(b) AS distinct$1])
+            +- Calc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 3) AS $g_3, =($e, 2) AS $g_2])
+               +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[null], $e=[3]}])
+                  +- Calc(select=[a, b, c, MOD(HASH_CODE(a), 1024) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+                     +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index 5cbbab4..7fbf0be 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -103,6 +103,11 @@ class DistinctAggregateTest(
   }
 
   @Test
+  def testTwoDistinctAggregateWithNonDistinctAgg(): Unit = {
+    util.verifyPlan("SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM MyTable GROUP BY c")
+  }
+
+  @Test
   def testSingleDistinctAggWithGroupBy(): Unit = {
     util.verifyPlan("SELECT a, COUNT(DISTINCT c) FROM MyTable GROUP BY a")
   }