You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/04/07 14:36:54 UTC

[1/2] flink git commit: [FLINK-6012] [table] Support SQL WindowStart and WindowEnd functions.

Repository: flink
Updated Branches:
  refs/heads/master 635394751 -> 0038da415


[FLINK-6012] [table] Support SQL WindowStart and WindowEnd functions.

This closes #3693.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa7907ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa7907ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa7907ab

Branch: refs/heads/master
Commit: fa7907ab0e90d182b6386802c97f9b4e001dc440
Parents: 6353947
Author: Haohui Mai <wh...@apache.org>
Authored: Fri Apr 7 01:08:12 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 7 14:07:05 2017 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  67 +++++++++-
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   5 +-
 .../common/WindowStartEndPropertiesRule.scala   | 122 +++++++++++++++++++
 .../flink/table/validate/FunctionCatalog.scala  |   8 +-
 .../scala/batch/sql/WindowAggregateTest.scala   |  38 ++++--
 .../scala/stream/sql/WindowAggregateTest.scala  |  42 ++++---
 6 files changed, 252 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 6f96920..2a838c7 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1439,7 +1439,7 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que
   <tbody>
     <tr>
       <td><code>TUMBLE(time_attr, interval)</code></td>
-      <td>Defines are tumbling time window. A tumbling time window assigns rows to non-overlapping, continuous windows with a fixed duration (<code>interval</code>). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time (stream + batch) or processing-time (stream).</td>
+      <td>Defines a tumbling time window. A tumbling time window assigns rows to non-overlapping, continuous windows with a fixed duration (<code>interval</code>). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time (stream + batch) or processing-time (stream).</td>
     </tr>
     <tr>
       <td><code>HOP(time_attr, interval, interval)</code></td>
@@ -1454,6 +1454,40 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que
 
 For SQL queries on streaming tables, the `time_attr` argument of the group window function must be one of the `rowtime()` or `proctime()` time-indicators, which distinguish between event or processing time, respectively. For SQL on batch tables, the `time_attr` argument of the group window function must be an attribute of type `TIMESTAMP`. 
 
+#### Selecting Group Window Start and End Timestamps
+
+The start and end timestamps of group windows can be selected with the following auxiliary functions:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Auxiliary Function</th>
+      <th class="text-left">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td>
+        <code>TUMBLE_START(time_attr, interval)</code><br/>
+        <code>HOP_START(time_attr, interval, interval)</code><br/>
+        <code>SESSION_START(time_attr, interval)</code><br/>
+      </td>
+      <td>Returns the start timestamp of the corresponding tumbling, hopping, and session window.</td>
+    </tr>
+    <tr>
+      <td>
+        <code>TUMBLE_END(time_attr, interval)</code><br/>
+        <code>HOP_END(time_attr, interval, interval)</code><br/>
+        <code>SESSION_END(time_attr, interval)</code><br/>
+      </td>
+      <td>Returns the end timestamp of the corresponding tumbling, hopping, and session window.</td>
+    </tr>
+  </tbody>
+</table>
+
+Note that the auxiliary functions must be called with exactly same arguments as the group window function in the `GROUP BY` clause.
+
 The following examples show how to specify SQL queries with group windows on streaming tables. 
 
 <div class="codetabs" markdown="1">
@@ -1469,7 +1503,10 @@ tableEnv.registerDataStream("Orders", ds, "user, product, amount");
 
 // compute SUM(amount) per day (in event-time)
 Table result1 = tableEnv.sql(
-  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user");
+  "SELECT user, " +
+  "  TUMBLE_START(rowtime(), INTERVAL '1' DAY) as wStart,  " +
+  "  SUM(amount) FROM Orders " + 
+  "GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user");
 
 // compute SUM(amount) per day (in processing-time)
 Table result2 = tableEnv.sql(
@@ -1481,7 +1518,12 @@ Table result3 = tableEnv.sql(
 
 // compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
 Table result4 = tableEnv.sql(
-  "SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user");
+  "SELECT user, " +
+  "  SESSION_START(rowtime(), INTERVAL '12' HOUR) AS sStart, " +
+  "  SESSION_END(rowtime(), INTERVAL '12' HOUR) AS snd, " + 
+  "  SUM(amount) " + 
+  "FROM Orders " + 
+  "GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user");
 
 {% endhighlight %}
 </div>
@@ -1498,7 +1540,14 @@ tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
 
 // compute SUM(amount) per day (in event-time)
 val result1 = tableEnv.sql(
-  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user")
+    """
+      |SELECT
+      |  user, 
+      |  TUMBLE_START(rowtime(), INTERVAL '1' DAY) as wStart,
+      |  SUM(amount)
+      | FROM Orders
+      | GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user
+    """.stripMargin)
 
 // compute SUM(amount) per day (in processing-time)
 val result2 = tableEnv.sql(
@@ -1510,7 +1559,15 @@ val result3 = tableEnv.sql(
 
 // compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
 val result4 = tableEnv.sql(
-  "SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user")
+    """
+      |SELECT
+      |  user, 
+      |  SESSION_START(rowtime(), INTERVAL '12' HOUR) AS sStart,
+      |  SESSION_END(rowtime(), INTERVAL '12' HOUR) AS sEnd,
+      |  SUM(amount)
+      | FROM Orders
+      | GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user
+    """.stripMargin)
 
 {% endhighlight %}
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 222021a..ca55473 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.plan.rules
 import org.apache.calcite.rel.rules._
 import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.table.calcite.rules.FlinkAggregateExpandDistinctAggregatesRule
+import org.apache.flink.table.plan.rules.common.WindowStartEndPropertiesRule
 import org.apache.flink.table.plan.rules.dataSet._
 import org.apache.flink.table.plan.rules.datastream._
 
@@ -38,7 +39,8 @@ object FlinkRuleSets {
     ProjectToWindowRule.PROJECT,
 
     // Transform window to LogicalWindowAggregate
-    DataSetLogicalWindowAggregateRule.INSTANCE
+    DataSetLogicalWindowAggregateRule.INSTANCE,
+    WindowStartEndPropertiesRule.INSTANCE
   )
 
   /**
@@ -136,6 +138,7 @@ object FlinkRuleSets {
   val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList(
     // Transform window to LogicalWindowAggregate
     DataStreamLogicalWindowAggregateRule.INSTANCE,
+    WindowStartEndPropertiesRule.INSTANCE,
 
     // simplify expressions rules
     ReduceExpressionsRule.FILTER_INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
new file mode 100644
index 0000000..c68da04
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.expressions.{WindowEnd, WindowStart}
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+
+import scala.collection.JavaConversions._
+
+class WindowStartEndPropertiesRule
+  extends RelOptRule(
+    WindowStartEndPropertiesRule.WINDOW_EXPRESSION_RULE_PREDICATE,
+    "WindowStartEndPropertiesRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val project = call.rel(0).asInstanceOf[LogicalProject]
+    // project includes at least on group auxiliary function
+    project.getProjects.exists {
+      case c: RexCall => c.getOperator.isGroupAuxiliary
+      case _ => false
+    }
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val project = call.rel(0).asInstanceOf[LogicalProject]
+    val innerProject = call.rel(1).asInstanceOf[LogicalProject]
+    val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
+
+    // Retrieve window start and end properties
+    val transformed = call.builder()
+    val rexBuilder = transformed.getRexBuilder
+    transformed.push(LogicalWindowAggregate.create(
+      agg.getWindow,
+      Seq(
+        NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias.get)),
+        NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias.get))
+      ), agg)
+    )
+
+    // forward window start and end properties
+    transformed.project(
+      innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end")))
+
+    // replace window auxiliary function by access to window properties
+    transformed.project(
+      project.getProjects.map{ x =>
+        if (WindowStartEndPropertiesRule.isWindowStart(x)) {
+          // replace expression by access to window start
+          rexBuilder.makeCast(x.getType, transformed.field("w$start"), false)
+        } else if (WindowStartEndPropertiesRule.isWindowEnd(x)) {
+          // replace expression by access to window end
+          rexBuilder.makeCast(x.getType, transformed.field("w$end"), false)
+        } else {
+          // preserve expression
+          x
+        }
+      }
+    )
+    val res = transformed.build()
+    call.transformTo(res)
+  }
+}
+
+object WindowStartEndPropertiesRule {
+  private val WINDOW_EXPRESSION_RULE_PREDICATE =
+    RelOptRule.operand(classOf[LogicalProject],
+      RelOptRule.operand(classOf[LogicalProject],
+        RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))
+
+  val INSTANCE = new WindowStartEndPropertiesRule
+
+  /** Checks if a RexNode is a window start auxiliary function. */
+  private def isWindowStart(node: RexNode): Boolean = {
+    node match {
+      case n: RexCall if n.getOperator.isGroupAuxiliary =>
+        n.getOperator match {
+          case SqlStdOperatorTable.TUMBLE_START |
+               SqlStdOperatorTable.HOP_START |
+               SqlStdOperatorTable.SESSION_START
+            => true
+          case _ => false
+        }
+      case _ => false
+    }
+  }
+
+  /** Checks if a RexNode is a window end auxiliary function. */
+  private def isWindowEnd(node: RexNode): Boolean = {
+    node match {
+      case n: RexCall if n.getOperator.isGroupAuxiliary =>
+        n.getOperator match {
+          case SqlStdOperatorTable.TUMBLE_END |
+               SqlStdOperatorTable.HOP_END |
+               SqlStdOperatorTable.SESSION_END
+            => true
+          case _ => false
+        }
+      case _ => false
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 6b8fd95..74b371a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -352,8 +352,14 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     EventTimeExtractor,
     ProcTimeExtractor,
     SqlStdOperatorTable.TUMBLE,
+    SqlStdOperatorTable.TUMBLE_START,
+    SqlStdOperatorTable.TUMBLE_END,
     SqlStdOperatorTable.HOP,
-    SqlStdOperatorTable.SESSION
+    SqlStdOperatorTable.HOP_START,
+    SqlStdOperatorTable.HOP_END,
+    SqlStdOperatorTable.SESSION,
+    SqlStdOperatorTable.SESSION_START,
+    SqlStdOperatorTable.SESSION_END
   )
 
   builtInSqlOperators.foreach(register)

http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
index e84ede6..25a81c4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
@@ -63,7 +63,14 @@ class WindowAggregateTest extends TableTestBase {
     util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
 
     val sqlQuery =
-      "SELECT c, SUM(a) AS sumA, MIN(b) AS minB FROM T GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c"
+      "SELECT " +
+        "  TUMBLE_START(ts, INTERVAL '4' MINUTE), " +
+        "  TUMBLE_END(ts, INTERVAL '4' MINUTE), " +
+        "  c, " +
+        "  SUM(a) AS sumA, " +
+        "  MIN(b) AS minB " +
+        "FROM T " +
+        "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c"
 
     val expected =
       unaryNode(
@@ -73,9 +80,10 @@ class WindowAggregateTest extends TableTestBase {
           batchTableNode(0),
           term("groupBy", "c"),
           term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 240000.millis)),
-          term("select", "c, SUM(a) AS sumA, MIN(b) AS minB")
+          term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " +
+            "start('w$) AS w$start, end('w$) AS w$end")
         ),
-        term("select", "c, sumA, minB")
+        term("select", "CAST(w$start) AS w$start, CAST(w$end) AS w$end, c, sumA, minB")
       )
 
     util.verifySql(sqlQuery, expected)
@@ -117,7 +125,12 @@ class WindowAggregateTest extends TableTestBase {
     util.addTable[(Int, Long, String, Long, Timestamp)]("T", 'a, 'b, 'c, 'd, 'ts)
 
     val sqlQuery =
-      "SELECT c, SUM(a) AS sumA, AVG(b) AS avgB " +
+      "SELECT " +
+        "  c, " +
+        "  HOP_END(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " +
+        "  HOP_START(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " +
+        "  SUM(a) AS sumA, " +
+        "  AVG(b) AS avgB " +
         "FROM T " +
         "GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), d, c"
 
@@ -130,9 +143,10 @@ class WindowAggregateTest extends TableTestBase {
           term("groupBy", "c, d"),
           term("window",
             EventTimeSlidingGroupWindow(Some('w$), 'ts, 10800000.millis, 3600000.millis)),
-          term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB")
+          term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " +
+            "start('w$) AS w$start, end('w$) AS w$end")
         ),
-        term("select", "c, sumA, avgB")
+        term("select", "c, CAST(w$end) AS w$end, CAST(w$start) AS w$start, sumA, avgB")
       )
 
     util.verifySql(sqlQuery, expected)
@@ -171,7 +185,12 @@ class WindowAggregateTest extends TableTestBase {
     util.addTable[(Int, Long, String, Int, Timestamp)]("T", 'a, 'b, 'c, 'd, 'ts)
 
     val sqlQuery =
-      "SELECT c, d, SUM(a) AS sumA, MIN(b) AS minB " +
+      "SELECT " +
+        "  c, d, " +
+        "  SESSION_START(ts, INTERVAL '12' HOUR), " +
+        "  SESSION_END(ts, INTERVAL '12' HOUR), " +
+        "  SUM(a) AS sumA, " +
+        "  MIN(b) AS minB " +
         "FROM T " +
         "GROUP BY SESSION(ts, INTERVAL '12' HOUR), c, d"
 
@@ -183,9 +202,10 @@ class WindowAggregateTest extends TableTestBase {
           batchTableNode(0),
           term("groupBy", "c, d"),
           term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 43200000.millis)),
-          term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB")
+          term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " +
+            "start('w$) AS w$start, end('w$) AS w$end")
         ),
-        term("select", "c, d, sumA, minB")
+        term("select", "c, d, CAST(w$start) AS w$start, CAST(w$end) AS w$end, sumA, minB")
       )
 
     util.verifySql(sqlQuery, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index f4befa6..6f03bec 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -17,12 +17,10 @@
  */
 package org.apache.flink.table.api.scala.stream.sql
 
-import java.sql.Timestamp
-
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeSessionGroupWindow, ProcessingTimeSlidingGroupWindow, ProcessingTimeTumblingGroupWindow}
+import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeSessionGroupWindow, ProcessingTimeSlidingGroupWindow}
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
 import org.junit.Test
@@ -86,7 +84,14 @@ class WindowAggregateTest extends TableTestBase {
 
   @Test
   def testTumbleFunction() = {
-    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(rowtime(), INTERVAL '15' MINUTE)"
+
+    val sql =
+      "SELECT " +
+        "  COUNT(*), " +
+        "  TUMBLE_START(rowtime(), INTERVAL '15' MINUTE), " +
+        "  TUMBLE_END(rowtime(), INTERVAL '15' MINUTE)" +
+        "FROM MyTable " +
+        "GROUP BY TUMBLE(rowtime(), INTERVAL '15' MINUTE)"
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -98,17 +103,21 @@ class WindowAggregateTest extends TableTestBase {
             term("select", "1970-01-01 00:00:00 AS $f0")
           ),
           term("window", EventTimeTumblingGroupWindow(Some('w$), 'rowtime, 900000.millis)),
-          term("select", "COUNT(*) AS EXPR$0")
+          term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end")
         ),
-        term("select", "EXPR$0")
+        term("select", "EXPR$0, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
       )
     streamUtil.verifySql(sql, expected)
   }
 
   @Test
   def testHoppingFunction() = {
-    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
-      "HOP(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
+    val sql =
+      "SELECT COUNT(*), " +
+        "  HOP_START(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR), " +
+        "  HOP_END(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR) " +
+        "FROM MyTable " +
+        "GROUP BY HOP(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -121,17 +130,22 @@ class WindowAggregateTest extends TableTestBase {
           ),
           term("window", ProcessingTimeSlidingGroupWindow(Some('w$),
             3600000.millis, 900000.millis)),
-          term("select", "COUNT(*) AS EXPR$0")
+          term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end")
         ),
-        term("select", "EXPR$0")
+        term("select", "EXPR$0, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
       )
     streamUtil.verifySql(sql, expected)
   }
 
   @Test
   def testSessionFunction() = {
-    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
-      "SESSION(proctime(), INTERVAL '15' MINUTE)"
+    val sql =
+      "SELECT " +
+        "  COUNT(*), " +
+        "  SESSION_START(proctime(), INTERVAL '15' MINUTE), " +
+        "  SESSION_END(proctime(), INTERVAL '15' MINUTE) " +
+        "FROM MyTable " +
+        "GROUP BY SESSION(proctime(), INTERVAL '15' MINUTE)"
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -143,9 +157,9 @@ class WindowAggregateTest extends TableTestBase {
             term("select", "1970-01-01 00:00:00 AS $f0")
           ),
           term("window", ProcessingTimeSessionGroupWindow(Some('w$), 900000.millis)),
-          term("select", "COUNT(*) AS EXPR$0")
+          term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end")
         ),
-        term("select", "EXPR$0")
+        term("select", "EXPR$0, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
       )
     streamUtil.verifySql(sql, expected)
   }


[2/2] flink git commit: [FLINK-5545] [table] Remove FlinkAggregateExpandDistinctAggregatesRule after bumping Calcite to v1.12.

Posted by fh...@apache.org.
[FLINK-5545] [table] Remove FlinkAggregateExpandDistinctAggregatesRule after bumping Calcite to v1.12.

This closes #3695.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0038da41
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0038da41
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0038da41

Branch: refs/heads/master
Commit: 0038da41553908a427dd20be75838cccb48c6bcf
Parents: fa7907a
Author: Kurt Young <yk...@gmail.com>
Authored: Fri Apr 7 17:46:06 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 7 14:09:56 2017 +0200

----------------------------------------------------------------------
 ...nkAggregateExpandDistinctAggregatesRule.java | 1158 ------------------
 .../flink/table/plan/rules/FlinkRuleSets.scala  |    8 +-
 2 files changed, 3 insertions(+), 1163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0038da41/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java
deleted file mode 100644
index 9d4e08e..0000000
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java
+++ /dev/null
@@ -1,1158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.calcite.rules;
-
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.logical.LogicalAggregate;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.fun.SqlCountAggFunction;
-import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.fun.SqlSumAggFunction;
-import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.RelBuilder;
-import org.apache.calcite.tools.RelBuilderFactory;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.Util;
-
-import org.apache.flink.util.Preconditions;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-/**
- Copy calcite's AggregateExpandDistinctAggregatesRule to Flink project,
- and do a quick fix to avoid some bad case mentioned in CALCITE-1558.
- Should drop it and use calcite's AggregateExpandDistinctAggregatesRule
- when we upgrade to calcite 1.12(above)
- */
-
-/**
- * Planner rule that expands distinct aggregates
- * (such as {@code COUNT(DISTINCT x)}) from a
- * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
- *
- * <p>How this is done depends upon the arguments to the function. If all
- * functions have the same argument
- * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument
- * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} is
- * sufficient.
- *
- * <p>If there are multiple arguments
- * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)})
- * the rule creates separate {@code Aggregate}s and combines using a
- * {@link org.apache.calcite.rel.core.Join}.
- */
-public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
-	//~ Static fields/initializers ---------------------------------------------
-
-	/** The default instance of the rule; operates only on logical expressions. */
-	public static final FlinkAggregateExpandDistinctAggregatesRule INSTANCE =
-			new FlinkAggregateExpandDistinctAggregatesRule(LogicalAggregate.class, true,
-					RelFactories.LOGICAL_BUILDER);
-
-	/** Instance of the rule that operates only on logical expressions and
-	 * generates a join. */
-	public static final FlinkAggregateExpandDistinctAggregatesRule JOIN =
-			new FlinkAggregateExpandDistinctAggregatesRule(LogicalAggregate.class, false,
-					RelFactories.LOGICAL_BUILDER);
-
-	private static final BigDecimal TWO = BigDecimal.valueOf(2L);
-
-	public final boolean useGroupingSets;
-
-	//~ Constructors -----------------------------------------------------------
-
-	public FlinkAggregateExpandDistinctAggregatesRule(
-			Class<? extends LogicalAggregate> clazz,
-			boolean useGroupingSets,
-			RelBuilderFactory relBuilderFactory) {
-		super(operand(clazz, any()), relBuilderFactory, null);
-		this.useGroupingSets = useGroupingSets;
-	}
-
-	/**
-	 * @deprecated to be removed before 2.0
-	 */
-	@Deprecated
-	public FlinkAggregateExpandDistinctAggregatesRule(
-			Class<? extends LogicalAggregate> clazz,
-			boolean useGroupingSets,
-			RelFactories.JoinFactory joinFactory) {
-		this(clazz, useGroupingSets, RelBuilder.proto(Contexts.of(joinFactory)));
-	}
-
-	/**
-	 * @deprecated to be removed before 2.0
-	 */
-	@Deprecated
-	public FlinkAggregateExpandDistinctAggregatesRule(
-			Class<? extends LogicalAggregate> clazz,
-			RelFactories.JoinFactory joinFactory) {
-		this(clazz, false, RelBuilder.proto(Contexts.of(joinFactory)));
-	}
-
-	//~ Methods ----------------------------------------------------------------
-
-	public void onMatch(RelOptRuleCall call) {
-		final Aggregate aggregate = call.rel(0);
-		if (!aggregate.containsDistinctCall()) {
-			return;
-		}
-
-		// Find all of the agg expressions. We use a LinkedHashSet to ensure
-		// determinism.
-		int nonDistinctCount = 0;
-		int distinctCount = 0;
-		int filterCount = 0;
-		int unsupportedAggCount = 0;
-		final Set<Pair<List<Integer>, Integer>> argLists = new LinkedHashSet<>();
-		for (AggregateCall aggCall : aggregate.getAggCallList()) {
-			if (aggCall.filterArg >= 0) {
-				++filterCount;
-			}
-			if (!aggCall.isDistinct()) {
-				++nonDistinctCount;
-				if (!(aggCall.getAggregation() instanceof SqlCountAggFunction
-						|| aggCall.getAggregation() instanceof SqlSumAggFunction
-						|| aggCall.getAggregation() instanceof SqlMinMaxAggFunction)) {
-					++unsupportedAggCount;
-				}
-				continue;
-			}
-			++distinctCount;
-			argLists.add(Pair.of(aggCall.getArgList(), aggCall.filterArg));
-		}
-		Preconditions.checkState(argLists.size() > 0, "containsDistinctCall lied");
-
-		// If all of the agg expressions are distinct and have the same
-		// arguments then we can use a more efficient form.
-		if (nonDistinctCount == 0 && argLists.size() == 1) {
-			final Pair<List<Integer>, Integer> pair =
-					Iterables.getOnlyElement(argLists);
-			final RelBuilder relBuilder = call.builder();
-			convertMonopole(relBuilder, aggregate, pair.left, pair.right);
-			call.transformTo(relBuilder.build());
-			return;
-		}
-
-		if (useGroupingSets) {
-			rewriteUsingGroupingSets(call, aggregate, argLists);
-			return;
-		}
-
-		// If only one distinct aggregate and one or more non-distinct aggregates,
-		// we can generate multi-phase aggregates
-		if (distinctCount == 1 // one distinct aggregate
-				&& filterCount == 0 // no filter
-				&& unsupportedAggCount == 0 // sum/min/max/count in non-distinct aggregate
-				&& nonDistinctCount > 0) { // one or more non-distinct aggregates
-			final RelBuilder relBuilder = call.builder();
-			convertSingletonDistinct(relBuilder, aggregate, argLists);
-			call.transformTo(relBuilder.build());
-			return;
-		}
-
-		// Create a list of the expressions which will yield the final result.
-		// Initially, the expressions point to the input field.
-		final List<RelDataTypeField> aggFields =
-				aggregate.getRowType().getFieldList();
-		final List<RexInputRef> refs = new ArrayList<>();
-		final List<String> fieldNames = aggregate.getRowType().getFieldNames();
-		final ImmutableBitSet groupSet = aggregate.getGroupSet();
-		final int groupAndIndicatorCount =
-				aggregate.getGroupCount() + aggregate.getIndicatorCount();
-		for (int i : Util.range(groupAndIndicatorCount)) {
-			refs.add(RexInputRef.of(i, aggFields));
-		}
-
-		// Aggregate the original relation, including any non-distinct aggregates.
-		final List<AggregateCall> newAggCallList = new ArrayList<>();
-		int i = -1;
-		for (AggregateCall aggCall : aggregate.getAggCallList()) {
-			++i;
-			if (aggCall.isDistinct()) {
-				refs.add(null);
-				continue;
-			}
-			refs.add(
-					new RexInputRef(
-							groupAndIndicatorCount + newAggCallList.size(),
-							aggFields.get(groupAndIndicatorCount + i).getType()));
-			newAggCallList.add(aggCall);
-		}
-
-		// In the case where there are no non-distinct aggregates (regardless of
-		// whether there are group bys), there's no need to generate the
-		// extra aggregate and join.
-		final RelBuilder relBuilder = call.builder();
-		relBuilder.push(aggregate.getInput());
-		int n = 0;
-		if (!newAggCallList.isEmpty()) {
-			final RelBuilder.GroupKey groupKey =
-					relBuilder.groupKey(groupSet, aggregate.indicator, aggregate.getGroupSets());
-			relBuilder.aggregate(groupKey, newAggCallList);
-			++n;
-		}
-
-		// For each set of operands, find and rewrite all calls which have that
-		// set of operands.
-		for (Pair<List<Integer>, Integer> argList : argLists) {
-			doRewrite(relBuilder, aggregate, n++, argList.left, argList.right, refs);
-		}
-
-		relBuilder.project(refs, fieldNames);
-		call.transformTo(relBuilder.build());
-	}
-
-	/**
-	 * Converts an aggregate with one distinct aggregate and one or more
-	 * non-distinct aggregates to multi-phase aggregates (see reference example
-	 * below).
-	 *
-	 * @param relBuilder Contains the input relational expression
-	 * @param aggregate  Original aggregate
-	 * @param argLists   Arguments and filters to the distinct aggregate function
-	 *
-	 */
-	private RelBuilder convertSingletonDistinct(RelBuilder relBuilder,
-											Aggregate aggregate, Set<Pair<List<Integer>, Integer>> argLists) {
-		// For example,
-		//	SELECT deptno, COUNT(*), SUM(bonus), MIN(DISTINCT sal)
-		//	FROM emp
-		//	GROUP BY deptno
-		//
-		// becomes
-		//
-		//	SELECT deptno, SUM(cnt), SUM(bonus), MIN(sal)
-		//	FROM (
-		//		  SELECT deptno, COUNT(*) as cnt, SUM(bonus), sal
-		//		  FROM EMP
-		//		  GROUP BY deptno, sal)			// Aggregate B
-		//	GROUP BY deptno						// Aggregate A
-		relBuilder.push(aggregate.getInput());
-		final List<Pair<RexNode, String>> projects = new ArrayList<>();
-		final Map<Integer, Integer> sourceOf = new HashMap<>();
-		SortedSet<Integer> newGroupSet = new TreeSet<>();
-		final List<RelDataTypeField> childFields =
-				relBuilder.peek().getRowType().getFieldList();
-		final boolean hasGroupBy = aggregate.getGroupSet().size() > 0;
-
-		SortedSet<Integer> groupSet = new TreeSet<>(aggregate.getGroupSet().asList());
-
-		// Add the distinct aggregate column(s) to the group-by columns,
-		// if not already a part of the group-by
-		newGroupSet.addAll(aggregate.getGroupSet().asList());
-		for (Pair<List<Integer>, Integer> argList : argLists) {
-			newGroupSet.addAll(argList.getKey());
-		}
-
-		// Re-map the arguments to the aggregate A. These arguments will get
-		// remapped because of the intermediate aggregate B generated as part of the
-		// transformation.
-		for (int arg : newGroupSet) {
-			sourceOf.put(arg, projects.size());
-			projects.add(RexInputRef.of2(arg, childFields));
-		}
-		// Generate the intermediate aggregate B
-		final List<AggregateCall> aggCalls = aggregate.getAggCallList();
-		final List<AggregateCall> newAggCalls = new ArrayList<>();
-		final List<Integer> fakeArgs = new ArrayList<>();
-		final Map<AggregateCall, Integer> callArgMap = new HashMap<>();
-		// First identify the real arguments, then use the rest for fake arguments
-		// e.g. if real arguments are 0, 1, 3. Then the fake arguments will be 2, 4
-		for (final AggregateCall aggCall : aggCalls) {
-			if (!aggCall.isDistinct()) {
-				for (int arg : aggCall.getArgList()) {
-					if (!groupSet.contains(arg)) {
-						sourceOf.put(arg, projects.size());
-					}
-				}
-			}
-		}
-		int fakeArg0 = 0;
-		for (final AggregateCall aggCall : aggCalls) {
-			// We will deal with non-distinct aggregates below
-			if (!aggCall.isDistinct()) {
-				boolean isGroupKeyUsedInAgg = false;
-				for (int arg : aggCall.getArgList()) {
-					if (groupSet.contains(arg)) {
-						isGroupKeyUsedInAgg = true;
-						break;
-					}
-				}
-				if (aggCall.getArgList().size() == 0 || isGroupKeyUsedInAgg) {
-					while (sourceOf.get(fakeArg0) != null) {
-						++fakeArg0;
-					}
-					fakeArgs.add(fakeArg0);
-					++fakeArg0;
-				}
-			}
-		}
-		for (final AggregateCall aggCall : aggCalls) {
-			if (!aggCall.isDistinct()) {
-				for (int arg : aggCall.getArgList()) {
-					if (!groupSet.contains(arg)) {
-						sourceOf.remove(arg);
-					}
-				}
-			}
-		}
-		// Compute the remapped arguments using fake arguments for non-distinct
-		// aggregates with no arguments e.g. count(*).
-		int fakeArgIdx = 0;
-		for (final AggregateCall aggCall : aggCalls) {
-			// Project the column corresponding to the distinct aggregate. Project
-			// as-is all the non-distinct aggregates
-			if (!aggCall.isDistinct()) {
-				final AggregateCall newCall =
-						AggregateCall.create(aggCall.getAggregation(), false,
-								aggCall.getArgList(), -1,
-								ImmutableBitSet.of(newGroupSet).cardinality(),
-								relBuilder.peek(), null, aggCall.name);
-				newAggCalls.add(newCall);
-				if (newCall.getArgList().size() == 0) {
-					int fakeArg = fakeArgs.get(fakeArgIdx);
-					callArgMap.put(newCall, fakeArg);
-					sourceOf.put(fakeArg, projects.size());
-					projects.add(
-							Pair.of((RexNode) new RexInputRef(fakeArg, newCall.getType()),
-									newCall.getName()));
-					++fakeArgIdx;
-				} else {
-					for (int arg : newCall.getArgList()) {
-						if (groupSet.contains(arg)) {
-							int fakeArg = fakeArgs.get(fakeArgIdx);
-							callArgMap.put(newCall, fakeArg);
-							sourceOf.put(fakeArg, projects.size());
-							projects.add(
-									Pair.of((RexNode) new RexInputRef(fakeArg, newCall.getType()),
-											newCall.getName()));
-							++fakeArgIdx;
-						} else {
-							sourceOf.put(arg, projects.size());
-							projects.add(
-									Pair.of((RexNode) new RexInputRef(arg, newCall.getType()),
-											newCall.getName()));
-						}
-					}
-				}
-			}
-		}
-		// Generate the aggregate B (see the reference example above)
-		relBuilder.push(
-				aggregate.copy(
-						aggregate.getTraitSet(), relBuilder.build(),
-						false, ImmutableBitSet.of(newGroupSet), null, newAggCalls));
-		// Convert the existing aggregate to aggregate A (see the reference example above)
-		final List<AggregateCall> newTopAggCalls =
-				Lists.newArrayList(aggregate.getAggCallList());
-		// Use the remapped arguments for the (non)distinct aggregate calls
-		for (int i = 0; i < newTopAggCalls.size(); i++) {
-			// Re-map arguments.
-			final AggregateCall aggCall = newTopAggCalls.get(i);
-			final int argCount = aggCall.getArgList().size();
-			final List<Integer> newArgs = new ArrayList<>(argCount);
-			final AggregateCall newCall;
-
-
-			for (int j = 0; j < argCount; j++) {
-				final Integer arg = aggCall.getArgList().get(j);
-				if (callArgMap.containsKey(aggCall)) {
-					newArgs.add(sourceOf.get(callArgMap.get(aggCall)));
-				}
-				else {
-					newArgs.add(sourceOf.get(arg));
-				}
-			}
-			if (aggCall.isDistinct()) {
-				newCall =
-						AggregateCall.create(aggCall.getAggregation(), false, newArgs,
-								-1, aggregate.getGroupSet().cardinality(), relBuilder.peek(),
-								aggCall.getType(), aggCall.name);
-			} else {
-				// If aggregate B had a COUNT aggregate call the corresponding aggregate at
-				// aggregate A must be SUM. For other aggregates, it remains the same.
-				if (aggCall.getAggregation() instanceof SqlCountAggFunction) {
-					if (aggCall.getArgList().size() == 0) {
-						newArgs.add(sourceOf.get(callArgMap.get(aggCall)));
-					}
-					if (hasGroupBy) {
-						SqlSumAggFunction sumAgg = new SqlSumAggFunction(null);
-						newCall =
-								AggregateCall.create(sumAgg, false, newArgs, -1,
-										aggregate.getGroupSet().cardinality(), relBuilder.peek(),
-										aggCall.getType(), aggCall.getName());
-					} else {
-						SqlSumEmptyIsZeroAggFunction sumAgg = new SqlSumEmptyIsZeroAggFunction();
-						newCall =
-								AggregateCall.create(sumAgg, false, newArgs, -1,
-										aggregate.getGroupSet().cardinality(), relBuilder.peek(),
-										aggCall.getType(), aggCall.getName());
-					}
-				} else {
-					newCall =
-							AggregateCall.create(aggCall.getAggregation(), false, newArgs, -1,
-									aggregate.getGroupSet().cardinality(),
-									relBuilder.peek(), aggCall.getType(), aggCall.name);
-				}
-			}
-			newTopAggCalls.set(i, newCall);
-		}
-		// Populate the group-by keys with the remapped arguments for aggregate A
-		newGroupSet.clear();
-		for (int arg : aggregate.getGroupSet()) {
-			newGroupSet.add(sourceOf.get(arg));
-		}
-		relBuilder.push(
-				aggregate.copy(aggregate.getTraitSet(),
-						relBuilder.build(), aggregate.indicator,
-						ImmutableBitSet.of(newGroupSet), null, newTopAggCalls));
-		return relBuilder;
-	}
-	/*
-	public RelBuilder convertSingletonDistinct(RelBuilder relBuilder,
-											   Aggregate aggregate, Set<Pair<List<Integer>, Integer>> argLists) {
-		// For example,
-		//	SELECT deptno, COUNT(*), SUM(bonus), MIN(DISTINCT sal)
-		//	FROM emp
-		//	GROUP BY deptno
-		//
-		// becomes
-		//
-		//	SELECT deptno, SUM(cnt), SUM(bonus), MIN(sal)
-		//	FROM (
-		//		  SELECT deptno, COUNT(*) as cnt, SUM(bonus), sal
-		//		  FROM EMP
-		//		  GROUP BY deptno, sal)			// Aggregate B
-		//	GROUP BY deptno						// Aggregate A
-		relBuilder.push(aggregate.getInput());
-		final List<Pair<RexNode, String>> projects = new ArrayList<>();
-		final Map<Integer, Integer> sourceOf = new HashMap<>();
-		SortedSet<Integer> newGroupSet = new TreeSet<>();
-		final List<RelDataTypeField> childFields =
-				relBuilder.peek().getRowType().getFieldList();
-		final boolean hasGroupBy = aggregate.getGroupSet().size() > 0;
-
-		// Add the distinct aggregate column(s) to the group-by columns,
-		// if not already a part of the group-by
-		newGroupSet.addAll(aggregate.getGroupSet().asList());
-		for (Pair<List<Integer>, Integer> argList : argLists) {
-			newGroupSet.addAll(argList.getKey());
-		}
-
-		// Re-map the arguments to the aggregate A. These arguments will get
-		// remapped because of the intermediate aggregate B generated as part of the
-		// transformation.
-		for (int arg : newGroupSet) {
-			sourceOf.put(arg, projects.size());
-			projects.add(RexInputRef.of2(arg, childFields));
-		}
-		// Generate the intermediate aggregate B
-		final List<AggregateCall> aggCalls = aggregate.getAggCallList();
-		final List<AggregateCall> newAggCalls = new ArrayList<>();
-		final List<Integer> fakeArgs = new ArrayList<>();
-		final Map<AggregateCall, Integer> callArgMap = new HashMap<>();
-		// First identify the real arguments, then use the rest for fake arguments
-		// e.g. if real arguments are 0, 1, 3. Then the fake arguments will be 2, 4
-		for (final AggregateCall aggCall : aggCalls) {
-			if (!aggCall.isDistinct()) {
-				for (int arg : aggCall.getArgList()) {
-					if (!sourceOf.containsKey(arg)) {
-						sourceOf.put(arg, projects.size());
-					}
-				}
-			}
-		}
-		int fakeArg0 = 0;
-		for (final AggregateCall aggCall : aggCalls) {
-			// We will deal with non-distinct aggregates below
-			if (!aggCall.isDistinct()) {
-				boolean isGroupKeyUsedInAgg = false;
-				for (int arg : aggCall.getArgList()) {
-					if (sourceOf.containsKey(arg)) {
-						isGroupKeyUsedInAgg = true;
-						break;
-					}
-				}
-				if (aggCall.getArgList().size() == 0 || isGroupKeyUsedInAgg) {
-					while (sourceOf.get(fakeArg0) != null) {
-						++fakeArg0;
-					}
-					fakeArgs.add(fakeArg0);
-				}
-			}
-		}
-		for (final AggregateCall aggCall : aggCalls) {
-			if (!aggCall.isDistinct()) {
-				for (int arg : aggCall.getArgList()) {
-					if (!sourceOf.containsKey(arg)) {
-						sourceOf.remove(arg);
-					}
-				}
-			}
-		}
-		// Compute the remapped arguments using fake arguments for non-distinct
-		// aggregates with no arguments e.g. count(*).
-		int fakeArgIdx = 0;
-		for (final AggregateCall aggCall : aggCalls) {
-			// Project the column corresponding to the distinct aggregate. Project
-			// as-is all the non-distinct aggregates
-			if (!aggCall.isDistinct()) {
-				final AggregateCall newCall =
-						AggregateCall.create(aggCall.getAggregation(), false,
-								aggCall.getArgList(), -1,
-								ImmutableBitSet.of(newGroupSet).cardinality(),
-								relBuilder.peek(), null, aggCall.name);
-				newAggCalls.add(newCall);
-				if (newCall.getArgList().size() == 0) {
-					int fakeArg = fakeArgs.get(fakeArgIdx);
-					callArgMap.put(newCall, fakeArg);
-					sourceOf.put(fakeArg, projects.size());
-					projects.add(
-							Pair.of((RexNode) new RexInputRef(fakeArg, newCall.getType()),
-									newCall.getName()));
-					++fakeArgIdx;
-				} else {
-					for (int arg : newCall.getArgList()) {
-						if (sourceOf.containsKey(arg)) {
-							int fakeArg = fakeArgs.get(fakeArgIdx);
-							callArgMap.put(newCall, fakeArg);
-							sourceOf.put(fakeArg, projects.size());
-							projects.add(
-									Pair.of((RexNode) new RexInputRef(fakeArg, newCall.getType()),
-											newCall.getName()));
-							++fakeArgIdx;
-						} else {
-							sourceOf.put(arg, projects.size());
-							projects.add(
-									Pair.of((RexNode) new RexInputRef(arg, newCall.getType()),
-											newCall.getName()));
-						}
-					}
-				}
-			}
-		}
-		// Generate the aggregate B (see the reference example above)
-		relBuilder.push(
-				aggregate.copy(
-						aggregate.getTraitSet(), relBuilder.build(),
-						false, ImmutableBitSet.of(newGroupSet), null, newAggCalls));
-		// Convert the existing aggregate to aggregate A (see the reference example above)
-		final List<AggregateCall> newTopAggCalls =
-				Lists.newArrayList(aggregate.getAggCallList());
-		// Use the remapped arguments for the (non)distinct aggregate calls
-		for (int i = 0; i < newTopAggCalls.size(); i++) {
-			// Re-map arguments.
-			final AggregateCall aggCall = newTopAggCalls.get(i);
-			final int argCount = aggCall.getArgList().size();
-			final List<Integer> newArgs = new ArrayList<>(argCount);
-			final AggregateCall newCall;
-
-
-			for (int j = 0; j < argCount; j++) {
-				final Integer arg = aggCall.getArgList().get(j);
-				if (callArgMap.containsKey(aggCall)) {
-					newArgs.add(sourceOf.get(callArgMap.get(aggCall)));
-				}
-				else {
-					newArgs.add(sourceOf.get(arg));
-				}
-			}
-			if (aggCall.isDistinct()) {
-				newCall =
-						AggregateCall.create(aggCall.getAggregation(), false, newArgs,
-								-1, aggregate.getGroupSet().cardinality(), relBuilder.peek(),
-								aggCall.getType(), aggCall.name);
-			} else {
-				// If aggregate B had a COUNT aggregate call the corresponding aggregate at
-				// aggregate A must be SUM. For other aggregates, it remains the same.
-				if (aggCall.getAggregation() instanceof SqlCountAggFunction) {
-					if (aggCall.getArgList().size() == 0) {
-						newArgs.add(sourceOf.get(callArgMap.get(aggCall)));
-					}
-					if (hasGroupBy) {
-						SqlSumAggFunction sumAgg = new SqlSumAggFunction(null);
-						newCall =
-								AggregateCall.create(sumAgg, false, newArgs, -1,
-										aggregate.getGroupSet().cardinality(), relBuilder.peek(),
-										aggCall.getType(), aggCall.getName());
-					} else {
-						SqlSumEmptyIsZeroAggFunction sumAgg = new SqlSumEmptyIsZeroAggFunction();
-						newCall =
-								AggregateCall.create(sumAgg, false, newArgs, -1,
-										aggregate.getGroupSet().cardinality(), relBuilder.peek(),
-										aggCall.getType(), aggCall.getName());
-					}
-				} else {
-					newCall =
-							AggregateCall.create(aggCall.getAggregation(), false, newArgs, -1,
-									aggregate.getGroupSet().cardinality(),
-									relBuilder.peek(), aggCall.getType(), aggCall.name);
-				}
-			}
-			newTopAggCalls.set(i, newCall);
-		}
-		// Populate the group-by keys with the remapped arguments for aggregate A
-		newGroupSet.clear();
-		for (int arg : aggregate.getGroupSet()) {
-			newGroupSet.add(sourceOf.get(arg));
-		}
-		relBuilder.push(
-				aggregate.copy(aggregate.getTraitSet(),
-						relBuilder.build(), aggregate.indicator,
-						ImmutableBitSet.of(newGroupSet), null, newTopAggCalls));
-		return relBuilder;
-	}
-	*/
-
-	@SuppressWarnings("DanglingJavadoc")
-	private void rewriteUsingGroupingSets(RelOptRuleCall call,
-										Aggregate aggregate, Set<Pair<List<Integer>, Integer>> argLists) {
-		final Set<ImmutableBitSet> groupSetTreeSet =
-				new TreeSet<>(ImmutableBitSet.ORDERING);
-		groupSetTreeSet.add(aggregate.getGroupSet());
-		for (Pair<List<Integer>, Integer> argList : argLists) {
-			groupSetTreeSet.add(
-					ImmutableBitSet.of(argList.left)
-							.setIf(argList.right, argList.right >= 0)
-							.union(aggregate.getGroupSet()));
-		}
-
-		final ImmutableList<ImmutableBitSet> groupSets =
-				ImmutableList.copyOf(groupSetTreeSet);
-		final ImmutableBitSet fullGroupSet = ImmutableBitSet.union(groupSets);
-
-		final List<AggregateCall> distinctAggCalls = new ArrayList<>();
-		for (Pair<AggregateCall, String> aggCall : aggregate.getNamedAggCalls()) {
-			if (!aggCall.left.isDistinct()) {
-				distinctAggCalls.add(aggCall.left.rename(aggCall.right));
-			}
-		}
-
-		final RelBuilder relBuilder = call.builder();
-		relBuilder.push(aggregate.getInput());
-		relBuilder.aggregate(relBuilder.groupKey(fullGroupSet, groupSets.size() > 1, groupSets),
-				distinctAggCalls);
-		final RelNode distinct = relBuilder.peek();
-		final int groupCount = fullGroupSet.cardinality();
-		final int indicatorCount = groupSets.size() > 1 ? groupCount : 0;
-
-		final RelOptCluster cluster = aggregate.getCluster();
-		final RexBuilder rexBuilder = cluster.getRexBuilder();
-		final RelDataTypeFactory typeFactory = cluster.getTypeFactory();
-		final RelDataType booleanType =
-				typeFactory.createTypeWithNullability(
-						typeFactory.createSqlType(SqlTypeName.BOOLEAN), false);
-		final List<Pair<RexNode, String>> predicates = new ArrayList<>();
-		final Map<ImmutableBitSet, Integer> filters = new HashMap<>();
-
-		/** Function to register a filter for a group set. */
-		class Registrar {
-			RexNode group = null;
-
-			private int register(ImmutableBitSet groupSet) {
-				if (group == null) {
-					group = makeGroup(groupCount - 1);
-				}
-				final RexNode node =
-						rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, group,
-								rexBuilder.makeExactLiteral(
-										toNumber(remap(fullGroupSet, groupSet))));
-				predicates.add(Pair.of(node, toString(groupSet)));
-				return groupCount + indicatorCount + distinctAggCalls.size()
-						+ predicates.size() - 1;
-			}
-
-			private RexNode makeGroup(int i) {
-				final RexInputRef ref =
-						rexBuilder.makeInputRef(booleanType, groupCount + i);
-				final RexNode kase =
-						rexBuilder.makeCall(SqlStdOperatorTable.CASE, ref,
-								rexBuilder.makeExactLiteral(BigDecimal.ZERO),
-								rexBuilder.makeExactLiteral(TWO.pow(i)));
-				if (i == 0) {
-					return kase;
-				} else {
-					return rexBuilder.makeCall(SqlStdOperatorTable.PLUS,
-							makeGroup(i - 1), kase);
-				}
-			}
-
-			private BigDecimal toNumber(ImmutableBitSet bitSet) {
-				BigDecimal n = BigDecimal.ZERO;
-				for (int key : bitSet) {
-					n = n.add(TWO.pow(key));
-				}
-				return n;
-			}
-
-			private String toString(ImmutableBitSet bitSet) {
-				final StringBuilder buf = new StringBuilder("$i");
-				for (int key : bitSet) {
-					buf.append(key).append('_');
-				}
-				return buf.substring(0, buf.length() - 1);
-			}
-		}
-		final Registrar registrar = new Registrar();
-		for (ImmutableBitSet groupSet : groupSets) {
-			filters.put(groupSet, registrar.register(groupSet));
-		}
-
-		if (!predicates.isEmpty()) {
-			List<Pair<RexNode, String>> nodes = new ArrayList<>();
-			for (RelDataTypeField f : relBuilder.peek().getRowType().getFieldList()) {
-				final RexNode node = rexBuilder.makeInputRef(f.getType(), f.getIndex());
-				nodes.add(Pair.of(node, f.getName()));
-			}
-			nodes.addAll(predicates);
-			relBuilder.project(Pair.left(nodes), Pair.right(nodes));
-		}
-
-		int x = groupCount + indicatorCount;
-		final List<AggregateCall> newCalls = new ArrayList<>();
-		for (AggregateCall aggCall : aggregate.getAggCallList()) {
-			final int newFilterArg;
-			final List<Integer> newArgList;
-			final SqlAggFunction aggregation;
-			if (!aggCall.isDistinct()) {
-				aggregation = SqlStdOperatorTable.MIN;
-				newArgList = ImmutableIntList.of(x++);
-				newFilterArg = filters.get(aggregate.getGroupSet());
-			} else {
-				aggregation = aggCall.getAggregation();
-				newArgList = remap(fullGroupSet, aggCall.getArgList());
-				newFilterArg =
-						filters.get(
-								ImmutableBitSet.of(aggCall.getArgList())
-										.setIf(aggCall.filterArg, aggCall.filterArg >= 0)
-										.union(aggregate.getGroupSet()));
-			}
-			final AggregateCall newCall =
-					AggregateCall.create(aggregation, false, newArgList, newFilterArg,
-							aggregate.getGroupCount(), distinct, null, aggCall.name);
-			newCalls.add(newCall);
-		}
-
-		relBuilder.aggregate(
-				relBuilder.groupKey(
-						remap(fullGroupSet, aggregate.getGroupSet()),
-						aggregate.indicator,
-						remap(fullGroupSet, aggregate.getGroupSets())),
-				newCalls);
-		relBuilder.convert(aggregate.getRowType(), true);
-		call.transformTo(relBuilder.build());
-	}
-
-	private static ImmutableBitSet remap(ImmutableBitSet groupSet,
-										ImmutableBitSet bitSet) {
-		final ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
-		for (Integer bit : bitSet) {
-			builder.set(remap(groupSet, bit));
-		}
-		return builder.build();
-	}
-
-	private static ImmutableList<ImmutableBitSet> remap(ImmutableBitSet groupSet,
-														Iterable<ImmutableBitSet> bitSets) {
-		final ImmutableList.Builder<ImmutableBitSet> builder =
-				ImmutableList.builder();
-		for (ImmutableBitSet bitSet : bitSets) {
-			builder.add(remap(groupSet, bitSet));
-		}
-		return builder.build();
-	}
-
-	private static List<Integer> remap(ImmutableBitSet groupSet,
-									List<Integer> argList) {
-		ImmutableIntList list = ImmutableIntList.of();
-		for (int arg : argList) {
-			list = list.append(remap(groupSet, arg));
-		}
-		return list;
-	}
-
-	private static int remap(ImmutableBitSet groupSet, int arg) {
-		return arg < 0 ? -1 : groupSet.indexOf(arg);
-	}
-
-	/**
-	 * Converts an aggregate relational expression that contains just one
-	 * distinct aggregate function (or perhaps several over the same arguments)
-	 * and no non-distinct aggregate functions.
-	 */
-	private RelBuilder convertMonopole(RelBuilder relBuilder, Aggregate aggregate,
-									List<Integer> argList, int filterArg) {
-		// For example,
-		//	SELECT deptno, COUNT(DISTINCT sal), SUM(DISTINCT sal)
-		//	FROM emp
-		//	GROUP BY deptno
-		//
-		// becomes
-		//
-		//	SELECT deptno, COUNT(distinct_sal), SUM(distinct_sal)
-		//	FROM (
-		//	  SELECT DISTINCT deptno, sal AS distinct_sal
-		//	  FROM EMP GROUP BY deptno)
-		//	GROUP BY deptno
-
-		// Project the columns of the GROUP BY plus the arguments
-		// to the agg function.
-		final Map<Integer, Integer> sourceOf = new HashMap<>();
-		createSelectDistinct(relBuilder, aggregate, argList, filterArg, sourceOf);
-
-		// Create an aggregate on top, with the new aggregate list.
-		final List<AggregateCall> newAggCalls =
-				Lists.newArrayList(aggregate.getAggCallList());
-		rewriteAggCalls(newAggCalls, argList, sourceOf);
-		final int cardinality = aggregate.getGroupSet().cardinality();
-		relBuilder.push(
-				aggregate.copy(aggregate.getTraitSet(), relBuilder.build(),
-						aggregate.indicator, ImmutableBitSet.range(cardinality), null,
-						newAggCalls));
-		return relBuilder;
-	}
-
-	/**
-	 * Converts all distinct aggregate calls to a given set of arguments.
-	 *
-	 * <p>This method is called several times, one for each set of arguments.
-	 * Each time it is called, it generates a JOIN to a new SELECT DISTINCT
-	 * relational expression, and modifies the set of top-level calls.
-	 *
-	 * @param aggregate Original aggregate
-	 * @param n		 Ordinal of this in a join. {@code relBuilder} contains the
-	 *				  input relational expression (either the original
-	 *				  aggregate, the output from the previous call to this
-	 *				  method. {@code n} is 0 if we're converting the
-	 *				  first distinct aggregate in a query with no non-distinct
-	 *				  aggregates)
-	 * @param argList   Arguments to the distinct aggregate function
-	 * @param filterArg Argument that filters input to aggregate function, or -1
-	 * @param refs	  Array of expressions which will be the projected by the
-	 *				  result of this rule. Those relating to this arg list will
-	 *				  be modified  @return Relational expression
-	 */
-	private void doRewrite(RelBuilder relBuilder, Aggregate aggregate, int n,
-						List<Integer> argList, int filterArg, List<RexInputRef> refs) {
-		final RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder();
-		final List<RelDataTypeField> leftFields;
-		if (n == 0) {
-			leftFields = null;
-		} else {
-			leftFields = relBuilder.peek().getRowType().getFieldList();
-		}
-
-		// LogicalAggregate(
-		//	 child,
-		//	 {COUNT(DISTINCT 1), SUM(DISTINCT 1), SUM(2)})
-		//
-		// becomes
-		//
-		// LogicalAggregate(
-		//	 LogicalJoin(
-		//		 child,
-		//		 LogicalAggregate(child, < all columns > {}),
-		//		 INNER,
-		//		 <f2 = f5>))
-		//
-		// E.g.
-		//   SELECT deptno, SUM(DISTINCT sal), COUNT(DISTINCT gender), MAX(age)
-		//   FROM Emps
-		//   GROUP BY deptno
-		//
-		// becomes
-		//
-		//   SELECT e.deptno, adsal.sum_sal, adgender.count_gender, e.max_age
-		//   FROM (
-		//	 SELECT deptno, MAX(age) as max_age
-		//	 FROM Emps GROUP BY deptno) AS e
-		//   JOIN (
-		//	 SELECT deptno, COUNT(gender) AS count_gender FROM (
-		//	   SELECT DISTINCT deptno, gender FROM Emps) AS dgender
-		//	 GROUP BY deptno) AS adgender
-		//	 ON e.deptno = adgender.deptno
-		//   JOIN (
-		//	 SELECT deptno, SUM(sal) AS sum_sal FROM (
-		//	   SELECT DISTINCT deptno, sal FROM Emps) AS dsal
-		//	 GROUP BY deptno) AS adsal
-		//   ON e.deptno = adsal.deptno
-		//   GROUP BY e.deptno
-		//
-		// Note that if a query contains no non-distinct aggregates, then the
-		// very first join/group by is omitted.  In the example above, if
-		// MAX(age) is removed, then the sub-select of "e" is not needed, and
-		// instead the two other group by's are joined to one another.
-
-		// Project the columns of the GROUP BY plus the arguments
-		// to the agg function.
-		final Map<Integer, Integer> sourceOf = new HashMap<>();
-		createSelectDistinct(relBuilder, aggregate, argList, filterArg, sourceOf);
-
-		// Now compute the aggregate functions on top of the distinct dataset.
-		// Each distinct agg becomes a non-distinct call to the corresponding
-		// field from the right; for example,
-		//   "COUNT(DISTINCT e.sal)"
-		// becomes
-		//   "COUNT(distinct_e.sal)".
-		final List<AggregateCall> aggCallList = new ArrayList<>();
-		final List<AggregateCall> aggCalls = aggregate.getAggCallList();
-
-		final int groupAndIndicatorCount =
-				aggregate.getGroupCount() + aggregate.getIndicatorCount();
-		int i = groupAndIndicatorCount - 1;
-		for (AggregateCall aggCall : aggCalls) {
-			++i;
-
-			// Ignore agg calls which are not distinct or have the wrong set
-			// arguments. If we're rewriting aggs whose args are {sal}, we will
-			// rewrite COUNT(DISTINCT sal) and SUM(DISTINCT sal) but ignore
-			// COUNT(DISTINCT gender) or SUM(sal).
-			if (!aggCall.isDistinct()) {
-				continue;
-			}
-			if (!aggCall.getArgList().equals(argList)) {
-				continue;
-			}
-
-			// Re-map arguments.
-			final int argCount = aggCall.getArgList().size();
-			final List<Integer> newArgs = new ArrayList<>(argCount);
-			for (int j = 0; j < argCount; j++) {
-				final Integer arg = aggCall.getArgList().get(j);
-				newArgs.add(sourceOf.get(arg));
-			}
-			final int newFilterArg =
-					aggCall.filterArg >= 0 ? sourceOf.get(aggCall.filterArg) : -1;
-			final AggregateCall newAggCall =
-					AggregateCall.create(aggCall.getAggregation(), false, newArgs,
-							newFilterArg, aggCall.getType(), aggCall.getName());
-			assert refs.get(i) == null;
-			if (n == 0) {
-				refs.set(i,
-						new RexInputRef(groupAndIndicatorCount + aggCallList.size(),
-								newAggCall.getType()));
-			} else {
-				refs.set(i,
-						new RexInputRef(leftFields.size() + groupAndIndicatorCount
-								+ aggCallList.size(), newAggCall.getType()));
-			}
-			aggCallList.add(newAggCall);
-		}
-
-		final Map<Integer, Integer> map = new HashMap<>();
-		for (Integer key : aggregate.getGroupSet()) {
-			map.put(key, map.size());
-		}
-		final ImmutableBitSet newGroupSet = aggregate.getGroupSet().permute(map);
-		assert newGroupSet
-				.equals(ImmutableBitSet.range(aggregate.getGroupSet().cardinality()));
-		ImmutableList<ImmutableBitSet> newGroupingSets = null;
-		if (aggregate.indicator) {
-			newGroupingSets =
-					ImmutableBitSet.ORDERING.immutableSortedCopy(
-							ImmutableBitSet.permute(aggregate.getGroupSets(), map));
-		}
-
-		relBuilder.push(
-				aggregate.copy(aggregate.getTraitSet(), relBuilder.build(),
-						aggregate.indicator, newGroupSet, newGroupingSets, aggCallList));
-
-		// If there's no left child yet, no need to create the join
-		if (n == 0) {
-			return;
-		}
-
-		// Create the join condition. It is of the form
-		//  'left.f0 = right.f0 and left.f1 = right.f1 and ...'
-		// where {f0, f1, ...} are the GROUP BY fields.
-		final List<RelDataTypeField> distinctFields =
-				relBuilder.peek().getRowType().getFieldList();
-		final List<RexNode> conditions = Lists.newArrayList();
-		for (i = 0; i < groupAndIndicatorCount; ++i) {
-			// null values form its own group
-			// use "is not distinct from" so that the join condition
-			// allows null values to match.
-			conditions.add(
-					rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_DISTINCT_FROM,
-							RexInputRef.of(i, leftFields),
-							new RexInputRef(leftFields.size() + i,
-									distinctFields.get(i).getType())));
-		}
-
-		// Join in the new 'select distinct' relation.
-		relBuilder.join(JoinRelType.INNER, conditions);
-	}
-
-	private static void rewriteAggCalls(
-			List<AggregateCall> newAggCalls,
-			List<Integer> argList,
-			Map<Integer, Integer> sourceOf) {
-		// Rewrite the agg calls. Each distinct agg becomes a non-distinct call
-		// to the corresponding field from the right; for example,
-		// "COUNT(DISTINCT e.sal)" becomes   "COUNT(distinct_e.sal)".
-		for (int i = 0; i < newAggCalls.size(); i++) {
-			final AggregateCall aggCall = newAggCalls.get(i);
-
-			// Ignore agg calls which are not distinct or have the wrong set
-			// arguments. If we're rewriting aggregates whose args are {sal}, we will
-			// rewrite COUNT(DISTINCT sal) and SUM(DISTINCT sal) but ignore
-			// COUNT(DISTINCT gender) or SUM(sal).
-			if (!aggCall.isDistinct()) {
-				continue;
-			}
-			if (!aggCall.getArgList().equals(argList)) {
-				continue;
-			}
-
-			// Re-map arguments.
-			final int argCount = aggCall.getArgList().size();
-			final List<Integer> newArgs = new ArrayList<>(argCount);
-			for (int j = 0; j < argCount; j++) {
-				final Integer arg = aggCall.getArgList().get(j);
-				newArgs.add(sourceOf.get(arg));
-			}
-			final AggregateCall newAggCall =
-					AggregateCall.create(aggCall.getAggregation(), false, newArgs, -1,
-							aggCall.getType(), aggCall.getName());
-			newAggCalls.set(i, newAggCall);
-		}
-	}
-
-	/**
-	 * Given an {@link org.apache.calcite.rel.logical.LogicalAggregate}
-	 * and the ordinals of the arguments to a
-	 * particular call to an aggregate function, creates a 'select distinct'
-	 * relational expression which projects the group columns and those
-	 * arguments but nothing else.
-	 *
-	 * <p>For example, given
-	 *
-	 * <blockquote>
-	 * <pre>select f0, count(distinct f1), count(distinct f2)
-	 * from t group by f0</pre>
-	 * </blockquote>
-	 *
-	 * and the argument list
-	 *
-	 * <blockquote>{2}</blockquote>
-	 *
-	 * returns
-	 *
-	 * <blockquote>
-	 * <pre>select distinct f0, f2 from t</pre>
-	 * </blockquote>
-	 *
-	 * '
-	 *
-	 * <p>The <code>sourceOf</code> map is populated with the source of each
-	 * column; in this case sourceOf.get(0) = 0, and sourceOf.get(1) = 2.</p>
-	 *
-	 * @param relBuilder Relational expression builder
-	 * @param aggregate Aggregate relational expression
-	 * @param argList   Ordinals of columns to make distinct
-	 * @param filterArg Ordinal of column to filter on, or -1
-	 * @param sourceOf  Out parameter, is populated with a map of where each
-	 *				  output field came from
-	 * @return Aggregate relational expression which projects the required
-	 * columns
-	 */
-	private RelBuilder createSelectDistinct(RelBuilder relBuilder,
-											Aggregate aggregate, List<Integer> argList, int filterArg,
-											Map<Integer, Integer> sourceOf) {
-		relBuilder.push(aggregate.getInput());
-		final List<Pair<RexNode, String>> projects = new ArrayList<>();
-		final List<RelDataTypeField> childFields =
-				relBuilder.peek().getRowType().getFieldList();
-		for (int i : aggregate.getGroupSet()) {
-			sourceOf.put(i, projects.size());
-			projects.add(RexInputRef.of2(i, childFields));
-		}
-		for (Integer arg : argList) {
-			if (filterArg >= 0) {
-				// Implement
-				//   agg(DISTINCT arg) FILTER $f
-				// by generating
-				//   SELECT DISTINCT ... CASE WHEN $f THEN arg ELSE NULL END AS arg
-				// and then applying
-				//   agg(arg)
-				// as usual.
-				//
-				// It works except for (rare) agg functions that need to see null
-				// values.
-				final RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder();
-				final RexInputRef filterRef = RexInputRef.of(filterArg, childFields);
-				final Pair<RexNode, String> argRef = RexInputRef.of2(arg, childFields);
-				RexNode condition =
-						rexBuilder.makeCall(SqlStdOperatorTable.CASE, filterRef,
-								argRef.left,
-								rexBuilder.ensureType(argRef.left.getType(),
-										rexBuilder.constantNull(), true));
-				sourceOf.put(arg, projects.size());
-				projects.add(Pair.of(condition, "i$" + argRef.right));
-				continue;
-			}
-			if (sourceOf.get(arg) != null) {
-				continue;
-			}
-			sourceOf.put(arg, projects.size());
-			projects.add(RexInputRef.of2(arg, childFields));
-		}
-		relBuilder.project(Pair.left(projects), Pair.right(projects));
-
-		// Get the distinct values of the GROUP BY fields and the arguments
-		// to the agg functions.
-		relBuilder.push(
-				aggregate.copy(aggregate.getTraitSet(), relBuilder.build(), false,
-						ImmutableBitSet.range(projects.size()),
-						null, ImmutableList.<AggregateCall>of()));
-		return relBuilder;
-	}
-}
-
-// End AggregateExpandDistinctAggregatesRule.java

http://git-wip-us.apache.org/repos/asf/flink/blob/0038da41/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index ca55473..41f095f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -20,8 +20,7 @@ package org.apache.flink.table.plan.rules
 
 import org.apache.calcite.rel.rules._
 import org.apache.calcite.tools.{RuleSet, RuleSets}
-import org.apache.flink.table.calcite.rules.FlinkAggregateExpandDistinctAggregatesRule
-import org.apache.flink.table.plan.rules.common.WindowStartEndPropertiesRule
+import org.apache.flink.table.plan.rules.common._
 import org.apache.flink.table.plan.rules.dataSet._
 import org.apache.flink.table.plan.rules.datastream._
 
@@ -87,6 +86,8 @@ object FlinkRuleSets {
     AggregateJoinTransposeRule.EXTENDED,
     // aggregate union rule
     AggregateUnionAggregateRule.INSTANCE,
+    // expand distinct aggregate to normal aggregate with groupby
+    AggregateExpandDistinctAggregatesRule.JOIN,
 
     // remove unnecessary sort rule
     SortRemoveRule.INSTANCE,
@@ -107,9 +108,6 @@ object FlinkRuleSets {
     ProjectToCalcRule.INSTANCE,
     CalcMergeRule.INSTANCE,
 
-    // distinct aggregate rule for FLINK-3475
-    FlinkAggregateExpandDistinctAggregatesRule.JOIN,
-
     // translate to Flink DataSet nodes
     DataSetWindowAggregateRule.INSTANCE,
     DataSetAggregateRule.INSTANCE,