You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/25 05:39:10 UTC
[flink] branch release-1.11 updated:
[FLINK-17651][table-planner-blink] DecomposeGroupingSetsRule generates
wrong plan
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 197f3a3 [FLINK-17651][table-planner-blink] DecomposeGroupingSetsRule generates wrong plan
197f3a3 is described below
commit 197f3a3c481a3dcdc0fa1661d2606fbae2fe3ef9
Author: Shuo Cheng <mf...@smail.nju.edu.cn>
AuthorDate: Mon May 25 13:37:29 2020 +0800
[FLINK-17651][table-planner-blink] DecomposeGroupingSetsRule generates wrong plan
when there exist distinct agg and simple agg with same filter
This closes #12208
---
.../rules/logical/DecomposeGroupingSetsRule.scala | 3 ++-
.../table/planner/plan/utils/ExpandUtil.scala | 9 +++++--
.../plan/batch/sql/agg/DistinctAggregateTest.xml | 28 ++++++++++++++++++++++
...nkAggregateExpandDistinctAggregatesRuleTest.xml | 24 +++++++++++++++++++
.../plan/common/DistinctAggregateTestBase.scala | 7 ++++++
.../batch/sql/agg/AggregateITCaseBase.scala | 6 ++---
6 files changed, 71 insertions(+), 6 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala
index 3107521..7f516af 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala
@@ -250,8 +250,9 @@ class DecomposeGroupingSetsRule extends RelOptRule(
val newAggCalls = aggCallsWithIndexes.collect {
case (aggCall, idx) if !groupIdExprs.contains(idx) =>
val newArgList = aggCall.getArgList.map(a => duplicateFieldMap.getOrElse(a, a)).toList
+ val newFilterArg = duplicateFieldMap.getOrDefault(aggCall.filterArg, aggCall.filterArg)
aggCall.adaptTo(
- relBuilder.peek(), newArgList, aggCall.filterArg, agg.getGroupCount, newGroupCount)
+ relBuilder.peek(), newArgList, newFilterArg, agg.getGroupCount, newGroupCount)
}
// create simple aggregate
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExpandUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExpandUtil.scala
index c3e471a..6a76a8c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExpandUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExpandUtil.scala
@@ -62,12 +62,17 @@ object ExpandUtil {
val commonGroupSet = groupSets.asList().reduce((g1, g2) => g1.intersect(g2)).asList()
val duplicateFieldIndexes = aggCalls.zipWithIndex.flatMap {
case (aggCall, idx) =>
+ // filterArg should also be considered here.
+ val allArgList = new util.ArrayList[Integer](aggCall.getArgList)
+ if (aggCall.filterArg > -1) {
+ allArgList.add(aggCall.filterArg)
+ }
if (groupIdExprs.contains(idx)) {
List.empty[Integer]
- } else if (commonGroupSet.containsAll(aggCall.getArgList)) {
+ } else if (commonGroupSet.containsAll(allArgList)) {
List.empty[Integer]
} else {
- aggCall.getArgList.diff(commonGroupSet)
+ allArgList.diff(commonGroupSet)
}
}.intersect(groupSet.asList()).sorted.toArray[Integer]
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml
index dadee2d..0545632 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml
@@ -667,4 +667,32 @@ Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2, EXPR$3])
]]>
</Resource>
</TestCase>
+ <TestCase name="testDistinctAggWithDuplicateFilterField">
+ <Resource name="sql">
+ <![CDATA[SELECT a, COUNT(c) FILTER (WHERE b > 1),
+COUNT(DISTINCT d) FILTER (WHERE b > 1) FROM MyTable2 GROUP BY a]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $2], EXPR$2=[COUNT(DISTINCT $3) FILTER $2])
++- LogicalProject(a=[$0], c=[$2], $f2=[IS TRUE(>($1, 1))], d=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2])
++- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MIN(min$0) AS EXPR$1, Final_COUNT(count$1) AS EXPR$2])
+ +- Exchange(distribution=[hash[a]])
+ +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MIN(EXPR$1) FILTER $g_3 AS min$0, Partial_COUNT(d) FILTER $g_0 AS count$1])
+ +- Calc(select=[a, d, EXPR$1, AND(=(CASE(=($e, 0:BIGINT), 0:BIGINT, 3:BIGINT), 0), IS TRUE(CAST($f2))) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 3:BIGINT), 3) AS $g_3])
+ +- HashAggregate(isMerge=[true], groupBy=[a, $f2, d, $e], select=[a, $f2, d, $e, Final_COUNT(count$0) AS EXPR$1])
+ +- Exchange(distribution=[hash[a, $f2, d, $e]])
+ +- LocalHashAggregate(groupBy=[a, $f2, d, $e], select=[a, $f2, d, $e, Partial_COUNT(c) FILTER $f2_0 AS count$0])
+ +- Expand(projects=[a, c, $f2, d, $e, $f2_0], projects=[{a, c, $f2, d, 0 AS $e, $f2 AS $f2_0}, {a, c, null AS $f2, null AS d, 3 AS $e, $f2 AS $f2_0}])
+ +- Calc(select=[a, c, IS TRUE(>(b, 1)) AS $f2, d])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
index 45e260e..2664a70 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
@@ -610,4 +610,28 @@ FlinkLogicalCalc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2, EXPR$3])
]]>
</Resource>
</TestCase>
+ <TestCase name="testDistinctAggWithDuplicateFilterField">
+ <Resource name="sql">
+ <![CDATA[SELECT a, COUNT(c) FILTER (WHERE b > 1),
+COUNT(DISTINCT d) FILTER (WHERE b > 1) FROM MyTable2 GROUP BY a]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $2], EXPR$2=[COUNT(DISTINCT $3) FILTER $2])
++- LogicalProject(a=[$0], c=[$2], $f2=[IS TRUE(>($1, 1))], d=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2])
++- FlinkLogicalAggregate(group=[{0}], EXPR$1=[MIN($2) FILTER $4], EXPR$2=[COUNT($1) FILTER $3])
+ +- FlinkLogicalCalc(select=[a, d, EXPR$1, AND(=(CASE(=($e, 0:BIGINT), 0:BIGINT, 3:BIGINT), 0), IS TRUE(CAST($f2))) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 3:BIGINT), 3) AS $g_3])
+ +- FlinkLogicalAggregate(group=[{0, 2, 3, 4}], EXPR$1=[COUNT($1) FILTER $5])
+ +- FlinkLogicalExpand(projects=[a, c, $f2, d, $e, $f2_0])
+ +- FlinkLogicalCalc(select=[a, c, IS TRUE(>(b, 1)) AS $f2, d])
+ +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala
index 6ec992f..fabc054 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala
@@ -183,6 +183,13 @@ abstract class DistinctAggregateTestBase extends TableTestBase {
util.verifyPlan(sqlQuery)
}
+ @Test
+ def testDistinctAggWithDuplicateFilterField(): Unit = {
+ val sqlQuery = "SELECT a, COUNT(c) FILTER (WHERE b > 1),\n" +
+ "COUNT(DISTINCT d) FILTER (WHERE b > 1) FROM MyTable2 GROUP BY a"
+ util.verifyPlan(sqlQuery)
+ }
+
@Test(expected = classOf[RuntimeException])
def testTooManyDistinctAggOnDifferentColumn(): Unit = {
// max group count must be less than 64
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
index eeb3188..07f7b58 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
@@ -136,9 +136,9 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
checkResult(
sql,
Seq(
- row(1,0,1,4),
- row(2,0,0,7),
- row(3,0,0,3)
+ row(1,4,1,4),
+ row(2,7,0,7),
+ row(3,3,0,3)
)
)
}