You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/05/07 19:21:50 UTC

[3/3] flink git commit: [FLINK-8690] [table] Add DISTINCT aggregates for group windows on streaming tables.

[FLINK-8690] [table] Add DISTINCT aggregates for group windows on streaming tables.

This closes #5940.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53610c31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53610c31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53610c31

Branch: refs/heads/master
Commit: 53610c31e88d3c4194990de70fb99d9f935f2e0d
Parents: d65d932
Author: Rong Rong <ro...@uber.com>
Authored: Sat Apr 28 08:59:12 2018 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon May 7 18:14:37 2018 +0200

----------------------------------------------------------------------
 .../codegen/AggregationCodeGenerator.scala      |   3 +-
 .../table/plan/nodes/CommonAggregate.scala      |   8 +-
 .../nodes/logical/FlinkLogicalAggregate.scala   |   4 +-
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   8 +-
 .../stream/sql/DistinctAggregateTest.scala      | 245 -------------------
 .../table/api/stream/sql/AggregateTest.scala    |  40 ---
 .../api/stream/sql/DistinctAggregateTest.scala  | 140 +++++++++++
 .../flink/table/plan/RetractionRulesTest.scala  |  14 +-
 .../table/runtime/stream/sql/SqlITCase.scala    |  77 ++++++
 9 files changed, 235 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
index d6a7b1a..11c0008 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
@@ -639,7 +639,8 @@ class AggregationCodeGenerator(
                |
                |    while (mergeIt$i.hasNext()) {
                |      java.util.Map.Entry entry = (java.util.Map.Entry) mergeIt$i.next();
-               |      Object k = entry.getKey();
+               |      ${classOf[Row].getCanonicalName} k =
+               |          (${classOf[Row].getCanonicalName}) entry.getKey();
                |      Long v = (Long) entry.getValue();
                |      if (aDistinctAcc$i.add(k, v)) {
                |        ${aggs(i)}.accumulate(aAcc$i, k);

http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
index 21cb60b..7960c8c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
@@ -49,12 +49,8 @@ trait CommonAggregate {
 
     val aggs = namedAggregates.map(_.getKey)
     val aggStrings = aggs.map( a => s"${a.getAggregation}(${
-      val d = if (a.isDistinct) {
-        "DISTINCT "
-      } else {
-        ""
-      }
-      d + (if (a.getArgList.size() > 0) {
+      val prefix = if (a.isDistinct) "DISTINCT " else ""
+      prefix + (if (a.getArgList.size() > 0) {
         a.getArgList.asScala.map(inFields(_)).mkString(", ")
       } else {
         "*"

http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
index 17b6f1b..9cf14d0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -74,15 +74,13 @@ private class FlinkLogicalAggregateConverter
 
     // we do not support these functions natively
     // they have to be converted using the AggregateReduceFunctionsRule
-    val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+    agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
       // we support AVG
       case SqlKind.AVG => true
       // but none of the other AVG agg functions
       case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
       case _ => true
     }
-
-    !agg.containsDistinctCall() && supported
   }
 
   override def convert(rel: RelNode): RelNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 9f3b8e9..52dab8b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -88,8 +88,6 @@ object FlinkRuleSets {
     AggregateJoinTransposeRule.EXTENDED,
     // aggregate union rule
     AggregateUnionAggregateRule.INSTANCE,
-    // expand distinct aggregate to normal aggregate with groupby
-    AggregateExpandDistinctAggregatesRule.JOIN,
 
     // reduce aggregate functions like AVG, STDDEV_POP etc.
     AggregateReduceFunctionsRule.INSTANCE,
@@ -138,7 +136,6 @@ object FlinkRuleSets {
     FlinkLogicalNativeTableScan.CONVERTER
   )
 
-
   /**
     * RuleSet to normalize plans for batch / DataSet execution
     */
@@ -155,7 +152,10 @@ object FlinkRuleSets {
     // Transform window to LogicalWindowAggregate
     DataSetLogicalWindowAggregateRule.INSTANCE,
     WindowPropertiesRule.INSTANCE,
-    WindowPropertiesHavingRule.INSTANCE
+    WindowPropertiesHavingRule.INSTANCE,
+
+    // expand distinct aggregate to normal aggregate with groupby
+    AggregateExpandDistinctAggregatesRule.JOIN
   )
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DistinctAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DistinctAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DistinctAggregateTest.scala
deleted file mode 100644
index 3b72f61..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DistinctAggregateTest.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.logical.TumblingGroupWindow
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class DistinctAggregateTest extends TableTestBase {
-
-  @Test
-  def testSingleDistinctAggregate(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
-    val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable " +
-      "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "1970-01-01 00:00:00 AS $f0", "a")
-      ),
-      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-      term("select", "COUNT(DISTINCT a) AS EXPR$0")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testMultiDistinctAggregateOnSameColumn(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
-    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable " +
-      "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "1970-01-01 00:00:00 AS $f0", "a")
-      ),
-      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-      term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT a) AS EXPR$1",
-        "MAX(DISTINCT a) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
-    // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
-    val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(c) FROM MyTable " +
-      "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
-    val expected0 = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "1970-01-01 00:00:00 AS $f0", "a", "c")
-      ),
-      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-      term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(c) AS EXPR$1")
-    )
-
-    util.verifySql(sqlQuery0, expected0)
-
-    // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
-    val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT c) FROM MyTable" +
-      " GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
-    val expected1 = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "1970-01-01 00:00:00 AS $f0", "a", "c")
-      ),
-      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-      term("select", "COUNT(a) AS EXPR$0", "SUM(DISTINCT c) AS EXPR$1")
-    )
-
-    util.verifySql(sqlQuery1, expected1)
-  }
-
-  @Test
-  def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
-    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT c) FROM MyTable " +
-      "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "1970-01-01 00:00:00 AS $f0", "a", "c")
-      ),
-      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-      term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT c) AS EXPR$1")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testMultiDistinctAndNonDistinctAggregateOnDifferentColumn(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
-    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT c), COUNT(b) FROM MyTable " +
-      "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-      term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT c) AS EXPR$1",
-        "COUNT(b) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSingleDistinctAggregateWithGrouping(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
-    val sqlQuery = "SELECT a, COUNT(a), SUM(DISTINCT c) FROM MyTable " +
-      "GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "a", "1970-01-01 00:00:00 AS $f1", "c")
-      ),
-      term("groupBy", "a"),
-      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-      term("select", "a", "COUNT(a) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSingleDistinctAggregateWithGroupingAndCountStar(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
-    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT c) FROM MyTable " +
-      "GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "a", "1970-01-01 00:00:00 AS $f1", "c")
-      ),
-      term("groupBy", "a"),
-      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-      term("select", "a", "COUNT(*) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testTwoDistinctAggregateWithGroupingAndCountStar(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
-    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT c), COUNT(DISTINCT c) FROM MyTable " +
-      "GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "a", "1970-01-01 00:00:00 AS $f1", "c")
-      ),
-      term("groupBy", "a"),
-      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-      term("select", "a", "COUNT(*) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2",
-        "COUNT(DISTINCT c) AS EXPR$3")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testTwoDifferentDistinctAggregateWithGroupingAndCountStar(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
-
-    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT c), COUNT(DISTINCT b) FROM MyTable " +
-      "GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("groupBy", "a"),
-      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-      term("select", "a", "COUNT(*) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2",
-        "COUNT(DISTINCT b) AS EXPR$3")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index 76d33c2..bb19036 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -61,46 +61,6 @@ class AggregateTest extends TableTestBase {
   }
 
   @Test
-  def testDistinct(): Unit = {
-    val sql = "SELECT DISTINCT a, b, c FROM MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamGroupAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "a, b, c")
-        ),
-        term("groupBy", "a, b, c"),
-        term("select", "a, b, c")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  // TODO: this query should be optimized to only have a single DataStreamGroupAggregate
-  // TODO: reopen this until FLINK-7144 fixed
-  @Ignore
-  @Test
-  def testDistinctAfterAggregate(): Unit = {
-    val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c"
-
-    val expected =
-      unaryNode(
-        "DataStreamGroupAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "a")
-        ),
-        term("groupBy", "a"),
-        term("select", "a")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-
-  @Test
   def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = {
     streamUtil.addFunction("udag", new MyAgg)
     val call = streamUtil

http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala
new file mode 100644
index 0000000..1ce63c6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.{SessionGroupWindow, SlidingGroupWindow, TumblingGroupWindow}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.{Ignore, Test}
+
+class DistinctAggregateTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)](
+    "MyTable",
+    'a, 'b, 'c,
+    'proctime.proctime, 'rowtime.rowtime)
+
+  @Test
+  def testDistinct(): Unit = {
+    val sql = "SELECT DISTINCT a, b, c FROM MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamGroupAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "a, b, c")
+        ),
+        term("groupBy", "a, b, c"),
+        term("select", "a, b, c")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  // TODO: this query should be optimized to only have a single DataStreamGroupAggregate
+  // TODO: reopen this until FLINK-7144 fixed
+  @Ignore
+  @Test
+  def testDistinctAfterAggregate(): Unit = {
+    val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c"
+
+    val expected =
+      unaryNode(
+        "DataStreamGroupAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "a")
+        ),
+        term("groupBy", "a"),
+        term("select", "a")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testDistinctAggregateOnTumbleWindow(): Unit = {
+    val sqlQuery = "SELECT COUNT(DISTINCT a), " +
+      "  SUM(a) " +
+      "FROM MyTable " +
+      "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE) "
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "rowtime", "a")
+      ),
+      term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+      term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(a) AS EXPR$1")
+    )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testMultiDistinctAggregateSameFieldOnHopWindow(): Unit = {
+    val sqlQuery = "SELECT COUNT(DISTINCT a), " +
+      "  SUM(DISTINCT a), " +
+      "  MAX(DISTINCT a) " +
+      "FROM MyTable " +
+      "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR) "
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "rowtime", "a")
+      ),
+      term("window", SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 900000.millis)),
+      term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT a) AS EXPR$1",
+        "MAX(DISTINCT a) AS EXPR$2")
+    )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testDistinctAggregateWithGroupingOnSessionWindow(): Unit = {
+    val sqlQuery = "SELECT a, " +
+      "  COUNT(a), " +
+      "  SUM(DISTINCT c) " +
+      "FROM MyTable " +
+      "GROUP BY a, SESSION(rowtime, INTERVAL '15' MINUTE) "
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "a", "rowtime", "c")
+      ),
+      term("groupBy", "a"),
+      term("window", SessionGroupWindow('w$, 'rowtime, 900000.millis)),
+      term("select", "a", "COUNT(a) AS EXPR$1", "SUM(DISTINCT c) AS EXPR$2")
+    )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
index 3541f9f..ff3fdf9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
@@ -250,18 +250,22 @@ class RetractionRulesTest extends TableTestBase {
     val expected =
       unaryNode(
         "DataStreamGroupAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          binaryNode(
-            "DataStreamUnion",
+        binaryNode(
+          "DataStreamUnion",
+          unaryNode(
+            "DataStreamCalc",
             unaryNode(
               "DataStreamGroupAggregate",
               "DataStreamScan(true, Acc)",
               "true, AccRetract"
             ),
-            "DataStreamScan(true, Acc)",
             "true, AccRetract"
           ),
+          unaryNode(
+            "DataStreamCalc",
+            "DataStreamScan(true, Acc)",
+            "true, Acc"
+          ),
           "true, AccRetract"
         ),
         s"$defaultStatus"

http://git-wip-us.apache.org/repos/asf/flink/blob/53610c31/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index b7950b7..9155ff9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -53,6 +53,83 @@ class SqlITCase extends StreamingWithStateTestBase {
     (20000L, "20", "Hello World"))
 
   @Test
+  def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
+    // create a watermark with 10ms offset to delay the window emission by 10ms to verify merge
+    val sessionWindowTestData = List(
+      (1L, 2, "Hello"),       // (1, Hello)       - window
+      (2L, 2, "Hello"),       // (1, Hello)       - window, deduped
+      (8L, 2, "Hello"),       // (2, Hello)       - window, deduped during merge
+      (10L, 3, "Hello"),      // (2, Hello)       - window, forwarded during merge
+      (9L, 9, "Hello World"), // (1, Hello World) - window
+      (4L, 1, "Hello"),       // (1, Hello)       - window, triggering merge
+      (16L, 16, "Hello"))     // (3, Hello)       - window (not merged)
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+    StreamITCase.clear
+    val stream = env
+      .fromCollection(sessionWindowTestData)
+      .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](10L))
+
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+    tEnv.registerTable("MyTable", table)
+
+    val sqlQuery = "SELECT c, " +
+      "  COUNT(DISTINCT b)," +
+      "  SESSION_END(rowtime, INTERVAL '0.005' SECOND) " +
+      "FROM MyTable " +
+      "GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c "
+
+    val results = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "Hello World,1,1970-01-01 00:00:00.014", // window starts at [9L] till {14L}
+      "Hello,1,1970-01-01 00:00:00.021",       // window starts at [16L] till {21L}, not merged
+      "Hello,3,1970-01-01 00:00:00.015"        // window starts at [1L,2L],
+                                               //   merged with [8L,10L], by [4L], till {15L}
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testDistinctAggOnRowTimeTumbleWindow(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(1)
+    StreamITCase.clear
+
+    val t = StreamTestData.get5TupleDataStream(env).assignAscendingTimestamps(x => x._2)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a, " +
+      "  SUM(DISTINCT e), " +
+      "  MIN(DISTINCT e), " +
+      "  COUNT(DISTINCT e)" +
+      "FROM MyTable " +
+      "GROUP BY a, " +
+      "  TUMBLE(rowtime, INTERVAL '5' SECOND) "
+
+    val results = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "1,1,1,1",
+      "2,3,1,2",
+      "3,5,2,2",
+      "4,3,1,2",
+      "5,6,1,3")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
   def testRowTimeTumbleWindow(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment