You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2018/12/03 10:29:23 UTC
[flink] branch master updated: [FLINK-11013] [table] Fix distinct
aggregates for group window in Table API
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ba10c91 [FLINK-11013] [table] Fix distinct aggregates for group window in Table API
ba10c91 is described below
commit ba10c913426875c7f69366f6c64e110fc3dc634c
Author: Dian Fu <fu...@alibaba-inc.com>
AuthorDate: Tue Nov 27 22:02:31 2018 +0800
[FLINK-11013] [table] Fix distinct aggregates for group window in Table API
This closes #7181
---
.../flink/table/plan/logical/operators.scala | 2 +
.../table/api/stream/table/AggregateTest.scala | 75 ++++++++++++++++++++++
2 files changed, 77 insertions(+)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 84e3f79..c0cfa24 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -633,6 +633,8 @@ case class WindowAggregate(
case aggExpr: Aggregation
if aggExpr.getSqlAggFunction.requiresOver =>
failValidation(s"OVER clause is necessary for window functions: [${aggExpr.getClass}].")
+ case aggExpr: DistinctAgg =>
+ validateAggregateExpression(aggExpr.child)
// check no nested aggregation exists.
case aggExpr: Aggregation =>
aggExpr.children.foreach { child =>
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
index 671f8dd..afa9f8b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.stream.table
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.{SessionGroupWindow, SlidingGroupWindow, TumblingGroupWindow}
import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.utils.TableTestBase
@@ -238,4 +239,78 @@ class AggregateTest extends TableTestBase {
util.verifyTable(resultTable, expected)
}
+
+ @Test
+ def testDistinctAggregateOnTumbleWindow(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)](
+ "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+ val result = table
+ .window(Tumble over 15.minute on 'rowtime as 'w)
+ .groupBy('w)
+ .select('a.count.distinct, 'a.sum)
+
+ val expected = unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "rowtime")
+ ),
+ term("window", TumblingGroupWindow('w, 'rowtime, 900000.millis)),
+ term("select", "COUNT(DISTINCT a) AS TMP_0", "SUM(a) AS TMP_1")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testMultiDistinctAggregateSameFieldOnHopWindow(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)](
+ "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+ val result = table
+ .window(Slide over 1.hour every 15.minute on 'rowtime as 'w)
+ .groupBy('w)
+ .select('a.count.distinct, 'a.sum.distinct, 'a.max.distinct)
+
+ val expected = unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "rowtime")
+ ),
+ term("window", SlidingGroupWindow('w, 'rowtime, 3600000.millis, 900000.millis)),
+ term("select", "COUNT(DISTINCT a) AS TMP_0", "SUM(DISTINCT a) AS TMP_1",
+ "MAX(DISTINCT a) AS TMP_2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testDistinctAggregateWithGroupingOnSessionWindow(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)](
+ "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+ val result = table
+ .window(Session withGap 15.minute on 'rowtime as 'w)
+ .groupBy('a, 'w)
+ .select('a, 'a.count, 'c.count.distinct)
+
+ val expected = unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "rowtime")
+ ),
+ term("groupBy", "a"),
+ term("window", SessionGroupWindow('w, 'rowtime, 900000.millis)),
+ term("select", "a", "COUNT(a) AS TMP_0", "COUNT(DISTINCT c) AS TMP_1")
+ )
+
+ util.verifyTable(result, expected)
+ }
}