You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/19 02:31:59 UTC

[flink] branch master updated: [FLINK-13742][table-planner-blink] Fix wrong result when aggregation contains both distinct aggs with and without filter

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 94fa4ce  [FLINK-13742][table-planner-blink] Fix wrong result when aggregation contains both distinct aggs with and without filter
94fa4ce is described below

commit 94fa4ceade57172362e2d35e5aac8383f8f40a40
Author: shuo.cs <sh...@alibaba-inc.com>
AuthorDate: Fri Aug 16 13:11:14 2019 +0800

    [FLINK-13742][table-planner-blink] Fix wrong result when aggregation contains both distinct aggs with and without filter
    
    This closes #9459
---
 .../planner/codegen/agg/DistinctAggCodeGen.scala   |  8 ++++--
 .../runtime/stream/sql/AggregateITCase.scala       | 29 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
index 78cb2f4..8db2d6d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
@@ -234,7 +234,9 @@ class DistinctAggCodeGen(
        """.stripMargin
     }
 
-    if (filterResults.exists(_.isDefined)) {
+    if (filterResults.forall(_.isDefined)) {
+      // using the `condition` below to filter data so as to reduce state cost
+      // if all distinct aggregations on same column have filter.
       val condition = filterResults.flatten.mkString(" || ")
       s"""
          |if ($condition) {
@@ -281,7 +283,9 @@ class DistinctAggCodeGen(
          |}
        """.stripMargin
 
-    if (filterResults.exists(_.isDefined)) {
+    if (filterResults.forall(_.isDefined)) {
+      // using the `condition` below to filter data so as to reduce state cost
+      // if all distinct aggregations on same column have filter.
       val condition = filterResults.flatten.mkString(" || ")
       s"""
          |if ($condition) {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 73b3a9b..aa6f15b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -1169,6 +1169,35 @@ class AggregateITCase(
   }
 
   @Test
+  def testDistinctWithMultiFilter(): Unit = {
+    val t = failingDataSource(TestData.tupleData3).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery =
+      s"""
+         |SELECT
+         |  b,
+         |  SUM(DISTINCT (a * 3)),
+         |  COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2)),
+         |  COUNT(DISTINCT c),
+         |  COUNT(DISTINCT c) filter (where MOD(a, 3) = 0),
+         |  COUNT(DISTINCT c) filter (where MOD(a, 3) = 1)
+         |FROM MyTable
+         |GROUP BY b
+       """.stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    val sink = new TestingRetractSink
+    result.addSink(sink)
+    env.execute()
+    val expected = List(
+      "1,3,1,1,0,1", "2,15,1,2,1,0",
+      "3,45,3,3,1,1", "4,102,1,4,1,2",
+      "5,195,1,5,2,1", "6,333,1,6,2,2")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
   def testPruneUselessAggCall(): Unit = {
     val data = new mutable.MutableList[(Int, Long, String)]
     data .+= ((1, 1L, "Hi"))