You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/25 05:39:10 UTC

[flink] branch release-1.11 updated: [FLINK-17651][table-planner-blink] DecomposeGroupingSetsRule generates wrong plan

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 197f3a3   [FLINK-17651][table-planner-blink] DecomposeGroupingSetsRule generates wrong plan
197f3a3 is described below

commit 197f3a3c481a3dcdc0fa1661d2606fbae2fe3ef9
Author: Shuo Cheng <mf...@smail.nju.edu.cn>
AuthorDate: Mon May 25 13:37:29 2020 +0800

     [FLINK-17651][table-planner-blink] DecomposeGroupingSetsRule generates wrong plan
    
    when there exist distinct agg and simple agg with same filter
    
    This closes #12208
---
 .../rules/logical/DecomposeGroupingSetsRule.scala  |  3 ++-
 .../table/planner/plan/utils/ExpandUtil.scala      |  9 +++++--
 .../plan/batch/sql/agg/DistinctAggregateTest.xml   | 28 ++++++++++++++++++++++
 ...nkAggregateExpandDistinctAggregatesRuleTest.xml | 24 +++++++++++++++++++
 .../plan/common/DistinctAggregateTestBase.scala    |  7 ++++++
 .../batch/sql/agg/AggregateITCaseBase.scala        |  6 ++---
 6 files changed, 71 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala
index 3107521..7f516af 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala
@@ -250,8 +250,9 @@ class DecomposeGroupingSetsRule extends RelOptRule(
     val newAggCalls = aggCallsWithIndexes.collect {
       case (aggCall, idx) if !groupIdExprs.contains(idx) =>
         val newArgList = aggCall.getArgList.map(a => duplicateFieldMap.getOrElse(a, a)).toList
+        val newFilterArg = duplicateFieldMap.getOrDefault(aggCall.filterArg, aggCall.filterArg)
         aggCall.adaptTo(
-          relBuilder.peek(), newArgList, aggCall.filterArg, agg.getGroupCount, newGroupCount)
+          relBuilder.peek(), newArgList, newFilterArg, agg.getGroupCount, newGroupCount)
     }
 
     // create simple aggregate
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExpandUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExpandUtil.scala
index c3e471a..6a76a8c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExpandUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExpandUtil.scala
@@ -62,12 +62,17 @@ object ExpandUtil {
     val commonGroupSet = groupSets.asList().reduce((g1, g2) => g1.intersect(g2)).asList()
     val duplicateFieldIndexes = aggCalls.zipWithIndex.flatMap {
       case (aggCall, idx) =>
+        // filterArg should also be considered here.
+        val allArgList = new util.ArrayList[Integer](aggCall.getArgList)
+        if (aggCall.filterArg > -1) {
+          allArgList.add(aggCall.filterArg)
+        }
         if (groupIdExprs.contains(idx)) {
           List.empty[Integer]
-        } else if (commonGroupSet.containsAll(aggCall.getArgList)) {
+        } else if (commonGroupSet.containsAll(allArgList)) {
           List.empty[Integer]
         } else {
-          aggCall.getArgList.diff(commonGroupSet)
+          allArgList.diff(commonGroupSet)
         }
     }.intersect(groupSet.asList()).sorted.toArray[Integer]
 
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml
index dadee2d..0545632 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml
@@ -667,4 +667,32 @@ Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2, EXPR$3])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testDistinctAggWithDuplicateFilterField">
+    <Resource name="sql">
+	  <![CDATA[SELECT a, COUNT(c) FILTER (WHERE b > 1),
+COUNT(DISTINCT d) FILTER (WHERE b > 1) FROM MyTable2 GROUP BY a]]>
+	</Resource>
+	<Resource name="planBefore">
+	  <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $2], EXPR$2=[COUNT(DISTINCT $3) FILTER $2])
++- LogicalProject(a=[$0], c=[$2], $f2=[IS TRUE(>($1, 1))], d=[$3])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+	</Resource>
+	<Resource name="planAfter">
+	  <![CDATA[
+Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2])
++- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MIN(min$0) AS EXPR$1, Final_COUNT(count$1) AS EXPR$2])
+   +- Exchange(distribution=[hash[a]])
+      +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MIN(EXPR$1) FILTER $g_3 AS min$0, Partial_COUNT(d) FILTER $g_0 AS count$1])
+         +- Calc(select=[a, d, EXPR$1, AND(=(CASE(=($e, 0:BIGINT), 0:BIGINT, 3:BIGINT), 0), IS TRUE(CAST($f2))) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 3:BIGINT), 3) AS $g_3])
+            +- HashAggregate(isMerge=[true], groupBy=[a, $f2, d, $e], select=[a, $f2, d, $e, Final_COUNT(count$0) AS EXPR$1])
+               +- Exchange(distribution=[hash[a, $f2, d, $e]])
+                  +- LocalHashAggregate(groupBy=[a, $f2, d, $e], select=[a, $f2, d, $e, Partial_COUNT(c) FILTER $f2_0 AS count$0])
+                     +- Expand(projects=[a, c, $f2, d, $e, $f2_0], projects=[{a, c, $f2, d, 0 AS $e, $f2 AS $f2_0}, {a, c, null AS $f2, null AS d, 3 AS $e, $f2 AS $f2_0}])
+                        +- Calc(select=[a, c, IS TRUE(>(b, 1)) AS $f2, d])
+                           +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+	</Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
index 45e260e..2664a70 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
@@ -610,4 +610,28 @@ FlinkLogicalCalc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2, EXPR$3])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testDistinctAggWithDuplicateFilterField">
+    <Resource name="sql">
+	  <![CDATA[SELECT a, COUNT(c) FILTER (WHERE b > 1),
+COUNT(DISTINCT d) FILTER (WHERE b > 1) FROM MyTable2 GROUP BY a]]>
+	</Resource>
+	<Resource name="planBefore">
+	  <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $2], EXPR$2=[COUNT(DISTINCT $3) FILTER $2])
++- LogicalProject(a=[$0], c=[$2], $f2=[IS TRUE(>($1, 1))], d=[$3])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+	</Resource>
+	<Resource name="planAfter">
+	  <![CDATA[
+FlinkLogicalCalc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2])
++- FlinkLogicalAggregate(group=[{0}], EXPR$1=[MIN($2) FILTER $4], EXPR$2=[COUNT($1) FILTER $3])
+   +- FlinkLogicalCalc(select=[a, d, EXPR$1, AND(=(CASE(=($e, 0:BIGINT), 0:BIGINT, 3:BIGINT), 0), IS TRUE(CAST($f2))) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 3:BIGINT), 3) AS $g_3])
+      +- FlinkLogicalAggregate(group=[{0, 2, 3, 4}], EXPR$1=[COUNT($1) FILTER $5])
+         +- FlinkLogicalExpand(projects=[a, c, $f2, d, $e, $f2_0])
+            +- FlinkLogicalCalc(select=[a, c, IS TRUE(>(b, 1)) AS $f2, d])
+               +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+	</Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala
index 6ec992f..fabc054 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala
@@ -183,6 +183,13 @@ abstract class DistinctAggregateTestBase extends TableTestBase {
     util.verifyPlan(sqlQuery)
   }
 
+  @Test
+  def testDistinctAggWithDuplicateFilterField(): Unit = {
+    val sqlQuery = "SELECT a, COUNT(c) FILTER (WHERE b > 1),\n" +
+      "COUNT(DISTINCT d) FILTER (WHERE b > 1) FROM MyTable2 GROUP BY a"
+    util.verifyPlan(sqlQuery)
+  }
+
   @Test(expected = classOf[RuntimeException])
   def testTooManyDistinctAggOnDifferentColumn(): Unit = {
     // max group count must be less than 64
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
index eeb3188..07f7b58 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
@@ -136,9 +136,9 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
     checkResult(
       sql,
       Seq(
-        row(1,0,1,4),
-        row(2,0,0,7),
-        row(3,0,0,3)
+        row(1,4,1,4),
+        row(2,7,0,7),
+        row(3,3,0,3)
       )
     )
   }