You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/19 02:31:59 UTC
[flink] branch master updated: [FLINK-13742][table-planner-blink]
Fix wrong result when aggregation contains both distinct aggs with and
without filter
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 94fa4ce [FLINK-13742][table-planner-blink] Fix wrong result when aggregation contains both distinct aggs with and without filter
94fa4ce is described below
commit 94fa4ceade57172362e2d35e5aac8383f8f40a40
Author: shuo.cs <sh...@alibaba-inc.com>
AuthorDate: Fri Aug 16 13:11:14 2019 +0800
[FLINK-13742][table-planner-blink] Fix wrong result when aggregation contains both distinct aggs with and without filter
This closes #9459
---
.../planner/codegen/agg/DistinctAggCodeGen.scala | 8 ++++--
.../runtime/stream/sql/AggregateITCase.scala | 29 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 2 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
index 78cb2f4..8db2d6d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
@@ -234,7 +234,9 @@ class DistinctAggCodeGen(
""".stripMargin
}
- if (filterResults.exists(_.isDefined)) {
+ if (filterResults.forall(_.isDefined)) {
+ // using the `condition` below to filter data so as to reduce state cost
+ // if all distinct aggregations on same column have filter.
val condition = filterResults.flatten.mkString(" || ")
s"""
|if ($condition) {
@@ -281,7 +283,9 @@ class DistinctAggCodeGen(
|}
""".stripMargin
- if (filterResults.exists(_.isDefined)) {
+ if (filterResults.forall(_.isDefined)) {
+ // using the `condition` below to filter data so as to reduce state cost
+ // if all distinct aggregations on same column have filter.
val condition = filterResults.flatten.mkString(" || ")
s"""
|if ($condition) {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 73b3a9b..aa6f15b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -1169,6 +1169,35 @@ class AggregateITCase(
}
@Test
+ def testDistinctWithMultiFilter(): Unit = {
+ val t = failingDataSource(TestData.tupleData3).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery =
+ s"""
+ |SELECT
+ | b,
+ | SUM(DISTINCT (a * 3)),
+ | COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2)),
+ | COUNT(DISTINCT c),
+ | COUNT(DISTINCT c) filter (where MOD(a, 3) = 0),
+ | COUNT(DISTINCT c) filter (where MOD(a, 3) = 1)
+ |FROM MyTable
+ |GROUP BY b
+ """.stripMargin
+
+ val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+ val sink = new TestingRetractSink
+ result.addSink(sink)
+ env.execute()
+ val expected = List(
+ "1,3,1,1,0,1", "2,15,1,2,1,0",
+ "3,45,3,3,1,1", "4,102,1,4,1,2",
+ "5,195,1,5,2,1", "6,333,1,6,2,2")
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
+
+ @Test
def testPruneUselessAggCall(): Unit = {
val data = new mutable.MutableList[(Int, Long, String)]
data .+= ((1, 1L, "Hi"))