You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/05/07 19:21:50 UTC
[3/3] flink git commit: [FLINK-8690] [table] Add DISTINCT aggregates
for group windows on streaming tables.
[FLINK-8690] [table] Add DISTINCT aggregates for group windows on streaming tables.
This closes #5940.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53610c31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53610c31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53610c31
Branch: refs/heads/master
Commit: 53610c31e88d3c4194990de70fb99d9f935f2e0d
Parents: d65d932
Author: Rong Rong <ro...@uber.com>
Authored: Sat Apr 28 08:59:12 2018 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon May 7 18:14:37 2018 +0200
----------------------------------------------------------------------
.../codegen/AggregationCodeGenerator.scala | 3 +-
.../table/plan/nodes/CommonAggregate.scala | 8 +-
.../nodes/logical/FlinkLogicalAggregate.scala | 4 +-
.../flink/table/plan/rules/FlinkRuleSets.scala | 8 +-
.../stream/sql/DistinctAggregateTest.scala | 245 -------------------
.../table/api/stream/sql/AggregateTest.scala | 40 ---
.../api/stream/sql/DistinctAggregateTest.scala | 140 +++++++++++
.../flink/table/plan/RetractionRulesTest.scala | 14 +-
.../table/runtime/stream/sql/SqlITCase.scala | 77 ++++++
9 files changed, 235 insertions(+), 304 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
index d6a7b1a..11c0008 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
@@ -639,7 +639,8 @@ class AggregationCodeGenerator(
|
| while (mergeIt$i.hasNext()) {
| java.util.Map.Entry entry = (java.util.Map.Entry) mergeIt$i.next();
- | Object k = entry.getKey();
+ | ${classOf[Row].getCanonicalName} k =
+ | (${classOf[Row].getCanonicalName}) entry.getKey();
| Long v = (Long) entry.getValue();
| if (aDistinctAcc$i.add(k, v)) {
| ${aggs(i)}.accumulate(aAcc$i, k);
http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
index 21cb60b..7960c8c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
@@ -49,12 +49,8 @@ trait CommonAggregate {
val aggs = namedAggregates.map(_.getKey)
val aggStrings = aggs.map( a => s"${a.getAggregation}(${
- val d = if (a.isDistinct) {
- "DISTINCT "
- } else {
- ""
- }
- d + (if (a.getArgList.size() > 0) {
+ val prefix = if (a.isDistinct) "DISTINCT " else ""
+ prefix + (if (a.getArgList.size() > 0) {
a.getArgList.asScala.map(inFields(_)).mkString(", ")
} else {
"*"
http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
index 17b6f1b..9cf14d0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -74,15 +74,13 @@ private class FlinkLogicalAggregateConverter
// we do not support these functions natively
// they have to be converted using the AggregateReduceFunctionsRule
- val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+ agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
// we support AVG
case SqlKind.AVG => true
// but none of the other AVG agg functions
case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
case _ => true
}
-
- !agg.containsDistinctCall() && supported
}
override def convert(rel: RelNode): RelNode = {
http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 9f3b8e9..52dab8b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -88,8 +88,6 @@ object FlinkRuleSets {
AggregateJoinTransposeRule.EXTENDED,
// aggregate union rule
AggregateUnionAggregateRule.INSTANCE,
- // expand distinct aggregate to normal aggregate with groupby
- AggregateExpandDistinctAggregatesRule.JOIN,
// reduce aggregate functions like AVG, STDDEV_POP etc.
AggregateReduceFunctionsRule.INSTANCE,
@@ -138,7 +136,6 @@ object FlinkRuleSets {
FlinkLogicalNativeTableScan.CONVERTER
)
-
/**
* RuleSet to normalize plans for batch / DataSet execution
*/
@@ -155,7 +152,10 @@ object FlinkRuleSets {
// Transform window to LogicalWindowAggregate
DataSetLogicalWindowAggregateRule.INSTANCE,
WindowPropertiesRule.INSTANCE,
- WindowPropertiesHavingRule.INSTANCE
+ WindowPropertiesHavingRule.INSTANCE,
+
+ // expand distinct aggregate to normal aggregate with groupby
+ AggregateExpandDistinctAggregatesRule.JOIN
)
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DistinctAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DistinctAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DistinctAggregateTest.scala
deleted file mode 100644
index 3b72f61..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DistinctAggregateTest.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.logical.TumblingGroupWindow
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class DistinctAggregateTest extends TableTestBase {
-
- @Test
- def testSingleDistinctAggregate(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
- val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable " +
- "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
- val expected = unaryNode(
- "DataStreamGroupWindowAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "1970-01-01 00:00:00 AS $f0", "a")
- ),
- term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
- term("select", "COUNT(DISTINCT a) AS EXPR$0")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testMultiDistinctAggregateOnSameColumn(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
- val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable " +
- "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
- val expected = unaryNode(
- "DataStreamGroupWindowAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "1970-01-01 00:00:00 AS $f0", "a")
- ),
- term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
- term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT a) AS EXPR$1",
- "MAX(DISTINCT a) AS EXPR$2")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
- // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
- val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(c) FROM MyTable " +
- "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
- val expected0 = unaryNode(
- "DataStreamGroupWindowAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "1970-01-01 00:00:00 AS $f0", "a", "c")
- ),
- term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
- term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(c) AS EXPR$1")
- )
-
- util.verifySql(sqlQuery0, expected0)
-
- // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
- val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT c) FROM MyTable" +
- " GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
- val expected1 = unaryNode(
- "DataStreamGroupWindowAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "1970-01-01 00:00:00 AS $f0", "a", "c")
- ),
- term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
- term("select", "COUNT(a) AS EXPR$0", "SUM(DISTINCT c) AS EXPR$1")
- )
-
- util.verifySql(sqlQuery1, expected1)
- }
-
- @Test
- def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
- val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT c) FROM MyTable " +
- "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
- val expected = unaryNode(
- "DataStreamGroupWindowAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "1970-01-01 00:00:00 AS $f0", "a", "c")
- ),
- term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
- term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT c) AS EXPR$1")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testMultiDistinctAndNonDistinctAggregateOnDifferentColumn(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
- val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT c), COUNT(b) FROM MyTable " +
- "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
- val expected = unaryNode(
- "DataStreamGroupWindowAggregate",
- streamTableNode(0),
- term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
- term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT c) AS EXPR$1",
- "COUNT(b) AS EXPR$2")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testSingleDistinctAggregateWithGrouping(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
- val sqlQuery = "SELECT a, COUNT(a), SUM(DISTINCT c) FROM MyTable " +
- "GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
- val expected = unaryNode(
- "DataStreamGroupWindowAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "1970-01-01 00:00:00 AS $f1", "c")
- ),
- term("groupBy", "a"),
- term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
- term("select", "a", "COUNT(a) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testSingleDistinctAggregateWithGroupingAndCountStar(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
- val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT c) FROM MyTable " +
- "GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
- val expected = unaryNode(
- "DataStreamGroupWindowAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "1970-01-01 00:00:00 AS $f1", "c")
- ),
- term("groupBy", "a"),
- term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
- term("select", "a", "COUNT(*) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testTwoDistinctAggregateWithGroupingAndCountStar(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
- val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT c), COUNT(DISTINCT c) FROM MyTable " +
- "GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
- val expected = unaryNode(
- "DataStreamGroupWindowAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "1970-01-01 00:00:00 AS $f1", "c")
- ),
- term("groupBy", "a"),
- term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
- term("select", "a", "COUNT(*) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2",
- "COUNT(DISTINCT c) AS EXPR$3")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testTwoDifferentDistinctAggregateWithGroupingAndCountStar(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
- val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT c), COUNT(DISTINCT b) FROM MyTable " +
- "GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
- val expected = unaryNode(
- "DataStreamGroupWindowAggregate",
- streamTableNode(0),
- term("groupBy", "a"),
- term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
- term("select", "a", "COUNT(*) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2",
- "COUNT(DISTINCT b) AS EXPR$3")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index 76d33c2..bb19036 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -61,46 +61,6 @@ class AggregateTest extends TableTestBase {
}
@Test
- def testDistinct(): Unit = {
- val sql = "SELECT DISTINCT a, b, c FROM MyTable"
-
- val expected =
- unaryNode(
- "DataStreamGroupAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a, b, c")
- ),
- term("groupBy", "a, b, c"),
- term("select", "a, b, c")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- // TODO: this query should be optimized to only have a single DataStreamGroupAggregate
- // TODO: reopen this until FLINK-7144 fixed
- @Ignore
- @Test
- def testDistinctAfterAggregate(): Unit = {
- val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c"
-
- val expected =
- unaryNode(
- "DataStreamGroupAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a")
- ),
- term("groupBy", "a"),
- term("select", "a")
- )
- streamUtil.verifySql(sql, expected)
- }
-
-
- @Test
def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = {
streamUtil.addFunction("udag", new MyAgg)
val call = streamUtil
http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala
new file mode 100644
index 0000000..1ce63c6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.{SessionGroupWindow, SlidingGroupWindow, TumblingGroupWindow}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.{Ignore, Test}
+
+class DistinctAggregateTest extends TableTestBase {
+ private val streamUtil: StreamTableTestUtil = streamTestUtil()
+ streamUtil.addTable[(Int, String, Long)](
+ "MyTable",
+ 'a, 'b, 'c,
+ 'proctime.proctime, 'rowtime.rowtime)
+
+ @Test
+ def testDistinct(): Unit = {
+ val sql = "SELECT DISTINCT a, b, c FROM MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a, b, c")
+ ),
+ term("groupBy", "a, b, c"),
+ term("select", "a, b, c")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ // TODO: this query should be optimized to only have a single DataStreamGroupAggregate
+ // TODO: reopen this until FLINK-7144 fixed
+ @Ignore
+ @Test
+ def testDistinctAfterAggregate(): Unit = {
+ val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c"
+
+ val expected =
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a")
+ ),
+ term("groupBy", "a"),
+ term("select", "a")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testDistinctAggregateOnTumbleWindow(): Unit = {
+ val sqlQuery = "SELECT COUNT(DISTINCT a), " +
+ " SUM(a) " +
+ "FROM MyTable " +
+ "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE) "
+
+ val expected = unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "rowtime", "a")
+ ),
+ term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+ term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(a) AS EXPR$1")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testMultiDistinctAggregateSameFieldOnHopWindow(): Unit = {
+ val sqlQuery = "SELECT COUNT(DISTINCT a), " +
+ " SUM(DISTINCT a), " +
+ " MAX(DISTINCT a) " +
+ "FROM MyTable " +
+ "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR) "
+
+ val expected = unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "rowtime", "a")
+ ),
+ term("window", SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 900000.millis)),
+ term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT a) AS EXPR$1",
+ "MAX(DISTINCT a) AS EXPR$2")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testDistinctAggregateWithGroupingOnSessionWindow(): Unit = {
+ val sqlQuery = "SELECT a, " +
+ " COUNT(a), " +
+ " SUM(DISTINCT c) " +
+ "FROM MyTable " +
+ "GROUP BY a, SESSION(rowtime, INTERVAL '15' MINUTE) "
+
+ val expected = unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "rowtime", "c")
+ ),
+ term("groupBy", "a"),
+ term("window", SessionGroupWindow('w$, 'rowtime, 900000.millis)),
+ term("select", "a", "COUNT(a) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
index 3541f9f..ff3fdf9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
@@ -250,18 +250,22 @@ class RetractionRulesTest extends TableTestBase {
val expected =
unaryNode(
"DataStreamGroupAggregate",
- unaryNode(
- "DataStreamCalc",
- binaryNode(
- "DataStreamUnion",
+ binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamCalc",
unaryNode(
"DataStreamGroupAggregate",
"DataStreamScan(true, Acc)",
"true, AccRetract"
),
- "DataStreamScan(true, Acc)",
"true, AccRetract"
),
+ unaryNode(
+ "DataStreamCalc",
+ "DataStreamScan(true, Acc)",
+ "true, Acc"
+ ),
"true, AccRetract"
),
s"$defaultStatus"
http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index b7950b7..9155ff9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -53,6 +53,83 @@ class SqlITCase extends StreamingWithStateTestBase {
(20000L, "20", "Hello World"))
@Test
+ def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
+ // create a watermark with 10ms offset to delay the window emission by 10ms to verify merge
+ val sessionWindowTestData = List(
+ (1L, 2, "Hello"), // (1, Hello) - window
+ (2L, 2, "Hello"), // (1, Hello) - window, deduped
+ (8L, 2, "Hello"), // (2, Hello) - window, deduped during merge
+ (10L, 3, "Hello"), // (2, Hello) - window, forwarded during merge
+ (9L, 9, "Hello World"), // (1, Hello World) - window
+ (4L, 1, "Hello"), // (1, Hello) - window, triggering merge
+ (16L, 16, "Hello")) // (3, Hello) - window (not merged)
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+ StreamITCase.clear
+ val stream = env
+ .fromCollection(sessionWindowTestData)
+ .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](10L))
+
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val table = stream.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+ tEnv.registerTable("MyTable", table)
+
+ val sqlQuery = "SELECT c, " +
+ " COUNT(DISTINCT b)," +
+ " SESSION_END(rowtime, INTERVAL '0.005' SECOND) " +
+ "FROM MyTable " +
+ "GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c "
+
+ val results = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = Seq(
+ "Hello World,1,1970-01-01 00:00:00.014", // window starts at [9L] till {14L}
+ "Hello,1,1970-01-01 00:00:00.021", // window starts at [16L] till {21L}, not merged
+ "Hello,3,1970-01-01 00:00:00.015" // window starts at [1L,2L],
+ // merged with [8L,10L], by [4L], till {15L}
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testDistinctAggOnRowTimeTumbleWindow(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setParallelism(1)
+ StreamITCase.clear
+
+ val t = StreamTestData.get5TupleDataStream(env).assignAscendingTimestamps(x => x._2)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(DISTINCT e), " +
+ " MIN(DISTINCT e), " +
+ " COUNT(DISTINCT e)" +
+ "FROM MyTable " +
+ "GROUP BY a, " +
+ " TUMBLE(rowtime, INTERVAL '5' SECOND) "
+
+ val results = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "1,1,1,1",
+ "2,3,1,2",
+ "3,5,2,2",
+ "4,3,1,2",
+ "5,6,1,3")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
def testRowTimeTumbleWindow(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment