You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2018/12/03 10:31:07 UTC

[flink] branch release-1.7 updated: [FLINK-11013] [table] Fix distinct aggregates for group window in Table API

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new fe1f7cb  [FLINK-11013] [table] Fix distinct aggregates for group window in Table API
fe1f7cb is described below

commit fe1f7cbd0ae4f945dcab3f589bbc51f380aad94d
Author: Dian Fu <fu...@alibaba-inc.com>
AuthorDate: Tue Nov 27 22:02:31 2018 +0800

    [FLINK-11013] [table] Fix distinct aggregates for group window in Table API
    
    This closes #7181
---
 .../flink/table/plan/logical/operators.scala       |  2 +
 .../table/api/stream/table/AggregateTest.scala     | 75 ++++++++++++++++++++++
 2 files changed, 77 insertions(+)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 84e3f79..c0cfa24 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -633,6 +633,8 @@ case class WindowAggregate(
       case aggExpr: Aggregation
         if aggExpr.getSqlAggFunction.requiresOver =>
         failValidation(s"OVER clause is necessary for window functions: [${aggExpr.getClass}].")
+      case aggExpr: DistinctAgg =>
+        validateAggregateExpression(aggExpr.child)
       // check no nested aggregation exists.
       case aggExpr: Aggregation =>
         aggExpr.children.foreach { child =>
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
index 671f8dd..afa9f8b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.stream.table
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.{SessionGroupWindow, SlidingGroupWindow, TumblingGroupWindow}
 import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.TableTestBase
@@ -238,4 +239,78 @@ class AggregateTest extends TableTestBase {
 
     util.verifyTable(resultTable, expected)
   }
+
+  @Test
+  def testDistinctAggregateOnTumbleWindow(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)](
+      "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+    val result = table
+      .window(Tumble over 15.minute on 'rowtime as 'w)
+      .groupBy('w)
+      .select('a.count.distinct, 'a.sum)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "a", "rowtime")
+      ),
+      term("window", TumblingGroupWindow('w, 'rowtime, 900000.millis)),
+      term("select", "COUNT(DISTINCT a) AS TMP_0", "SUM(a) AS TMP_1")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testMultiDistinctAggregateSameFieldOnHopWindow(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)](
+      "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+    val result = table
+      .window(Slide over 1.hour every 15.minute on 'rowtime as 'w)
+      .groupBy('w)
+      .select('a.count.distinct, 'a.sum.distinct, 'a.max.distinct)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "a", "rowtime")
+      ),
+      term("window", SlidingGroupWindow('w, 'rowtime, 3600000.millis, 900000.millis)),
+      term("select", "COUNT(DISTINCT a) AS TMP_0", "SUM(DISTINCT a) AS TMP_1",
+           "MAX(DISTINCT a) AS TMP_2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testDistinctAggregateWithGroupingOnSessionWindow(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)](
+      "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+    val result = table
+      .window(Session withGap 15.minute on 'rowtime as 'w)
+      .groupBy('a, 'w)
+      .select('a, 'a.count, 'c.count.distinct)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "a", "c", "rowtime")
+      ),
+      term("groupBy", "a"),
+      term("window", SessionGroupWindow('w, 'rowtime, 900000.millis)),
+      term("select", "a", "COUNT(a) AS TMP_0", "COUNT(DISTINCT c) AS TMP_1")
+    )
+
+    util.verifyTable(result, expected)
+  }
 }