You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/05/07 19:21:49 UTC

[2/3] flink git commit: [FLINK-8690] [table] Add DISTINCT aggregates for group windows on streaming tables.

[FLINK-8690] [table] Add DISTINCT aggregates for group windows on streaming tables.

This closes #3764.
This closes #3765. // Has been resolved by another PR.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d65d9324
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d65d9324
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d65d9324

Branch: refs/heads/master
Commit: d65d9324557f961d9d4c9b7f4132230bba8823dc
Parents: 7a31ffd
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Apr 24 11:22:07 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon May 7 18:14:37 2018 +0200

----------------------------------------------------------------------
 .../table/plan/nodes/CommonAggregate.scala      |   9 +-
 .../common/LogicalWindowAggregateRule.scala     |   3 +-
 .../DataStreamGroupWindowAggregateRule.scala    |   8 +-
 .../stream/sql/DistinctAggregateTest.scala      | 245 +++++++++++++++++++
 4 files changed, 254 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d65d9324/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
index e95747c..21cb60b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
@@ -49,11 +49,16 @@ trait CommonAggregate {
 
     val aggs = namedAggregates.map(_.getKey)
     val aggStrings = aggs.map( a => s"${a.getAggregation}(${
-      if (a.getArgList.size() > 0) {
+      val d = if (a.isDistinct) {
+        "DISTINCT "
+      } else {
+        ""
+      }
+      d + (if (a.getArgList.size() > 0) {
         a.getArgList.asScala.map(inFields(_)).mkString(", ")
       } else {
         "*"
-      }
+      })
     })")
 
     val propStrings = namedProperties.map(_.property.toString)

http://git-wip-us.apache.org/repos/asf/flink/blob/d65d9324/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
index 927700b..11701fc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
@@ -40,7 +40,6 @@ abstract class LogicalWindowAggregateRule(ruleName: String)
   override def matches(call: RelOptRuleCall): Boolean = {
     val agg = call.rel(0).asInstanceOf[LogicalAggregate]
 
-    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
     val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
 
     val windowExpressions = getWindowExpressions(agg)
@@ -48,7 +47,7 @@ abstract class LogicalWindowAggregateRule(ruleName: String)
       throw new TableException("Only a single window group function may be used in GROUP BY")
     }
 
-    !distinctAggs && !groupSets && !agg.indicator && windowExpressions.nonEmpty
+    !groupSets && !agg.indicator && windowExpressions.nonEmpty
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d65d9324/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala
index 3beeb47..b77b7bf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala
@@ -39,19 +39,13 @@ class DataStreamGroupWindowAggregateRule
   override def matches(call: RelOptRuleCall): Boolean = {
     val agg: FlinkLogicalWindowAggregate = call.rel(0).asInstanceOf[FlinkLogicalWindowAggregate]
 
-    // check if we have distinct aggregates
-    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
-    if (distinctAggs) {
-      throw TableException("DISTINCT aggregates are currently not supported.")
-    }
-
     // check if we have grouping sets
     val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
     if (groupSets || agg.indicator) {
       throw TableException("GROUPING SETS are currently not supported.")
     }
 
-    !distinctAggs && !groupSets && !agg.indicator
+    !groupSets && !agg.indicator
   }
 
   override def convert(rel: RelNode): RelNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/d65d9324/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DistinctAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DistinctAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DistinctAggregateTest.scala
new file mode 100644
index 0000000..3b72f61
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DistinctAggregateTest.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.TumblingGroupWindow
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class DistinctAggregateTest extends TableTestBase {
+
+  @Test
+  def testSingleDistinctAggregate(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+
+    val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable " +
+      "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "1970-01-01 00:00:00 AS $f0", "a")
+      ),
+      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+      term("select", "COUNT(DISTINCT a) AS EXPR$0")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testMultiDistinctAggregateOnSameColumn(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+
+    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable " +
+      "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "1970-01-01 00:00:00 AS $f0", "a")
+      ),
+      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+      term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT a) AS EXPR$1",
+        "MAX(DISTINCT a) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+
+    // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
+    val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(c) FROM MyTable " +
+      "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+
+    val expected0 = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "1970-01-01 00:00:00 AS $f0", "a", "c")
+      ),
+      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+      term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(c) AS EXPR$1")
+    )
+
+    util.verifySql(sqlQuery0, expected0)
+
+    // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
+    val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT c) FROM MyTable" +
+      " GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+
+    val expected1 = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "1970-01-01 00:00:00 AS $f0", "a", "c")
+      ),
+      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+      term("select", "COUNT(a) AS EXPR$0", "SUM(DISTINCT c) AS EXPR$1")
+    )
+
+    util.verifySql(sqlQuery1, expected1)
+  }
+
+  @Test
+  def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+
+    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT c) FROM MyTable " +
+      "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "1970-01-01 00:00:00 AS $f0", "a", "c")
+      ),
+      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+      term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT c) AS EXPR$1")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testMultiDistinctAndNonDistinctAggregateOnDifferentColumn(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+
+    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT c), COUNT(b) FROM MyTable " +
+      "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+      term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT c) AS EXPR$1",
+        "COUNT(b) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testSingleDistinctAggregateWithGrouping(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+
+    val sqlQuery = "SELECT a, COUNT(a), SUM(DISTINCT c) FROM MyTable " +
+      "GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "a", "1970-01-01 00:00:00 AS $f1", "c")
+      ),
+      term("groupBy", "a"),
+      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+      term("select", "a", "COUNT(a) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testSingleDistinctAggregateWithGroupingAndCountStar(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+
+    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT c) FROM MyTable " +
+      "GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "a", "1970-01-01 00:00:00 AS $f1", "c")
+      ),
+      term("groupBy", "a"),
+      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+      term("select", "a", "COUNT(*) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testTwoDistinctAggregateWithGroupingAndCountStar(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+
+    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT c), COUNT(DISTINCT c) FROM MyTable " +
+      "GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "a", "1970-01-01 00:00:00 AS $f1", "c")
+      ),
+      term("groupBy", "a"),
+      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+      term("select", "a", "COUNT(*) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2",
+        "COUNT(DISTINCT c) AS EXPR$3")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testTwoDifferentDistinctAggregateWithGroupingAndCountStar(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+
+    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT c), COUNT(DISTINCT b) FROM MyTable " +
+      "GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("groupBy", "a"),
+      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+      term("select", "a", "COUNT(*) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2",
+        "COUNT(DISTINCT b) AS EXPR$3")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+}