You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/04/07 14:36:54 UTC
[1/2] flink git commit: [FLINK-6012] [table] Support SQL WindowStart
and WindowEnd functions.
Repository: flink
Updated Branches:
refs/heads/master 635394751 -> 0038da415
[FLINK-6012] [table] Support SQL WindowStart and WindowEnd functions.
This closes #3693.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa7907ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa7907ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa7907ab
Branch: refs/heads/master
Commit: fa7907ab0e90d182b6386802c97f9b4e001dc440
Parents: 6353947
Author: Haohui Mai <wh...@apache.org>
Authored: Fri Apr 7 01:08:12 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 7 14:07:05 2017 +0200
----------------------------------------------------------------------
docs/dev/table_api.md | 67 +++++++++-
.../flink/table/plan/rules/FlinkRuleSets.scala | 5 +-
.../common/WindowStartEndPropertiesRule.scala | 122 +++++++++++++++++++
.../flink/table/validate/FunctionCatalog.scala | 8 +-
.../scala/batch/sql/WindowAggregateTest.scala | 38 ++++--
.../scala/stream/sql/WindowAggregateTest.scala | 42 ++++---
6 files changed, 252 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 6f96920..2a838c7 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1439,7 +1439,7 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que
<tbody>
<tr>
<td><code>TUMBLE(time_attr, interval)</code></td>
- <td>Defines are tumbling time window. A tumbling time window assigns rows to non-overlapping, continuous windows with a fixed duration (<code>interval</code>). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time (stream + batch) or processing-time (stream).</td>
+ <td>Defines a tumbling time window. A tumbling time window assigns rows to non-overlapping, continuous windows with a fixed duration (<code>interval</code>). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time (stream + batch) or processing-time (stream).</td>
</tr>
<tr>
<td><code>HOP(time_attr, interval, interval)</code></td>
@@ -1454,6 +1454,40 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que
For SQL queries on streaming tables, the `time_attr` argument of the group window function must be one of the `rowtime()` or `proctime()` time-indicators, which distinguish between event or processing time, respectively. For SQL on batch tables, the `time_attr` argument of the group window function must be an attribute of type `TIMESTAMP`.
+#### Selecting Group Window Start and End Timestamps
+
+The start and end timestamps of group windows can be selected with the following auxiliary functions:
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 40%">Auxiliary Function</th>
+ <th class="text-left">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <tr>
+ <td>
+ <code>TUMBLE_START(time_attr, interval)</code><br/>
+ <code>HOP_START(time_attr, interval, interval)</code><br/>
+ <code>SESSION_START(time_attr, interval)</code><br/>
+ </td>
+ <td>Returns the start timestamp of the corresponding tumbling, hopping, and session window.</td>
+ </tr>
+ <tr>
+ <td>
+ <code>TUMBLE_END(time_attr, interval)</code><br/>
+ <code>HOP_END(time_attr, interval, interval)</code><br/>
+ <code>SESSION_END(time_attr, interval)</code><br/>
+ </td>
+ <td>Returns the end timestamp of the corresponding tumbling, hopping, and session window.</td>
+ </tr>
+ </tbody>
+</table>
+
+Note that the auxiliary functions must be called with exactly same arguments as the group window function in the `GROUP BY` clause.
+
The following examples show how to specify SQL queries with group windows on streaming tables.
<div class="codetabs" markdown="1">
@@ -1469,7 +1503,10 @@ tableEnv.registerDataStream("Orders", ds, "user, product, amount");
// compute SUM(amount) per day (in event-time)
Table result1 = tableEnv.sql(
- "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user");
+ "SELECT user, " +
+ " TUMBLE_START(rowtime(), INTERVAL '1' DAY) as wStart, " +
+ " SUM(amount) FROM Orders " +
+ "GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user");
// compute SUM(amount) per day (in processing-time)
Table result2 = tableEnv.sql(
@@ -1481,7 +1518,12 @@ Table result3 = tableEnv.sql(
// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
Table result4 = tableEnv.sql(
- "SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user");
+ "SELECT user, " +
+ " SESSION_START(rowtime(), INTERVAL '12' HOUR) AS sStart, " +
+ " SESSION_END(rowtime(), INTERVAL '12' HOUR) AS snd, " +
+ " SUM(amount) " +
+ "FROM Orders " +
+ "GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user");
{% endhighlight %}
</div>
@@ -1498,7 +1540,14 @@ tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// compute SUM(amount) per day (in event-time)
val result1 = tableEnv.sql(
- "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user")
+ """
+ |SELECT
+ | user,
+ | TUMBLE_START(rowtime(), INTERVAL '1' DAY) as wStart,
+ | SUM(amount)
+ | FROM Orders
+ | GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user
+ """.stripMargin)
// compute SUM(amount) per day (in processing-time)
val result2 = tableEnv.sql(
@@ -1510,7 +1559,15 @@ val result3 = tableEnv.sql(
// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
val result4 = tableEnv.sql(
- "SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user")
+ """
+ |SELECT
+ | user,
+ | SESSION_START(rowtime(), INTERVAL '12' HOUR) AS sStart,
+ | SESSION_END(rowtime(), INTERVAL '12' HOUR) AS sEnd,
+ | SUM(amount)
+ | FROM Orders
+ | GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user
+ """.stripMargin)
{% endhighlight %}
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 222021a..ca55473 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.plan.rules
import org.apache.calcite.rel.rules._
import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.table.calcite.rules.FlinkAggregateExpandDistinctAggregatesRule
+import org.apache.flink.table.plan.rules.common.WindowStartEndPropertiesRule
import org.apache.flink.table.plan.rules.dataSet._
import org.apache.flink.table.plan.rules.datastream._
@@ -38,7 +39,8 @@ object FlinkRuleSets {
ProjectToWindowRule.PROJECT,
// Transform window to LogicalWindowAggregate
- DataSetLogicalWindowAggregateRule.INSTANCE
+ DataSetLogicalWindowAggregateRule.INSTANCE,
+ WindowStartEndPropertiesRule.INSTANCE
)
/**
@@ -136,6 +138,7 @@ object FlinkRuleSets {
val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList(
// Transform window to LogicalWindowAggregate
DataStreamLogicalWindowAggregateRule.INSTANCE,
+ WindowStartEndPropertiesRule.INSTANCE,
// simplify expressions rules
ReduceExpressionsRule.FILTER_INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
new file mode 100644
index 0000000..c68da04
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.expressions.{WindowEnd, WindowStart}
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+
+import scala.collection.JavaConversions._
+
+class WindowStartEndPropertiesRule
+ extends RelOptRule(
+ WindowStartEndPropertiesRule.WINDOW_EXPRESSION_RULE_PREDICATE,
+ "WindowStartEndPropertiesRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val project = call.rel(0).asInstanceOf[LogicalProject]
+ // project includes at least on group auxiliary function
+ project.getProjects.exists {
+ case c: RexCall => c.getOperator.isGroupAuxiliary
+ case _ => false
+ }
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val project = call.rel(0).asInstanceOf[LogicalProject]
+ val innerProject = call.rel(1).asInstanceOf[LogicalProject]
+ val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
+
+ // Retrieve window start and end properties
+ val transformed = call.builder()
+ val rexBuilder = transformed.getRexBuilder
+ transformed.push(LogicalWindowAggregate.create(
+ agg.getWindow,
+ Seq(
+ NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias.get)),
+ NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias.get))
+ ), agg)
+ )
+
+ // forward window start and end properties
+ transformed.project(
+ innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end")))
+
+ // replace window auxiliary function by access to window properties
+ transformed.project(
+ project.getProjects.map{ x =>
+ if (WindowStartEndPropertiesRule.isWindowStart(x)) {
+ // replace expression by access to window start
+ rexBuilder.makeCast(x.getType, transformed.field("w$start"), false)
+ } else if (WindowStartEndPropertiesRule.isWindowEnd(x)) {
+ // replace expression by access to window end
+ rexBuilder.makeCast(x.getType, transformed.field("w$end"), false)
+ } else {
+ // preserve expression
+ x
+ }
+ }
+ )
+ val res = transformed.build()
+ call.transformTo(res)
+ }
+}
+
+object WindowStartEndPropertiesRule {
+ private val WINDOW_EXPRESSION_RULE_PREDICATE =
+ RelOptRule.operand(classOf[LogicalProject],
+ RelOptRule.operand(classOf[LogicalProject],
+ RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))
+
+ val INSTANCE = new WindowStartEndPropertiesRule
+
+ /** Checks if a RexNode is a window start auxiliary function. */
+ private def isWindowStart(node: RexNode): Boolean = {
+ node match {
+ case n: RexCall if n.getOperator.isGroupAuxiliary =>
+ n.getOperator match {
+ case SqlStdOperatorTable.TUMBLE_START |
+ SqlStdOperatorTable.HOP_START |
+ SqlStdOperatorTable.SESSION_START
+ => true
+ case _ => false
+ }
+ case _ => false
+ }
+ }
+
+ /** Checks if a RexNode is a window end auxiliary function. */
+ private def isWindowEnd(node: RexNode): Boolean = {
+ node match {
+ case n: RexCall if n.getOperator.isGroupAuxiliary =>
+ n.getOperator match {
+ case SqlStdOperatorTable.TUMBLE_END |
+ SqlStdOperatorTable.HOP_END |
+ SqlStdOperatorTable.SESSION_END
+ => true
+ case _ => false
+ }
+ case _ => false
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 6b8fd95..74b371a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -352,8 +352,14 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
EventTimeExtractor,
ProcTimeExtractor,
SqlStdOperatorTable.TUMBLE,
+ SqlStdOperatorTable.TUMBLE_START,
+ SqlStdOperatorTable.TUMBLE_END,
SqlStdOperatorTable.HOP,
- SqlStdOperatorTable.SESSION
+ SqlStdOperatorTable.HOP_START,
+ SqlStdOperatorTable.HOP_END,
+ SqlStdOperatorTable.SESSION,
+ SqlStdOperatorTable.SESSION_START,
+ SqlStdOperatorTable.SESSION_END
)
builtInSqlOperators.foreach(register)
http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
index e84ede6..25a81c4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
@@ -63,7 +63,14 @@ class WindowAggregateTest extends TableTestBase {
util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
val sqlQuery =
- "SELECT c, SUM(a) AS sumA, MIN(b) AS minB FROM T GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c"
+ "SELECT " +
+ " TUMBLE_START(ts, INTERVAL '4' MINUTE), " +
+ " TUMBLE_END(ts, INTERVAL '4' MINUTE), " +
+ " c, " +
+ " SUM(a) AS sumA, " +
+ " MIN(b) AS minB " +
+ "FROM T " +
+ "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c"
val expected =
unaryNode(
@@ -73,9 +80,10 @@ class WindowAggregateTest extends TableTestBase {
batchTableNode(0),
term("groupBy", "c"),
term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 240000.millis)),
- term("select", "c, SUM(a) AS sumA, MIN(b) AS minB")
+ term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " +
+ "start('w$) AS w$start, end('w$) AS w$end")
),
- term("select", "c, sumA, minB")
+ term("select", "CAST(w$start) AS w$start, CAST(w$end) AS w$end, c, sumA, minB")
)
util.verifySql(sqlQuery, expected)
@@ -117,7 +125,12 @@ class WindowAggregateTest extends TableTestBase {
util.addTable[(Int, Long, String, Long, Timestamp)]("T", 'a, 'b, 'c, 'd, 'ts)
val sqlQuery =
- "SELECT c, SUM(a) AS sumA, AVG(b) AS avgB " +
+ "SELECT " +
+ " c, " +
+ " HOP_END(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " +
+ " HOP_START(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " +
+ " SUM(a) AS sumA, " +
+ " AVG(b) AS avgB " +
"FROM T " +
"GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), d, c"
@@ -130,9 +143,10 @@ class WindowAggregateTest extends TableTestBase {
term("groupBy", "c, d"),
term("window",
EventTimeSlidingGroupWindow(Some('w$), 'ts, 10800000.millis, 3600000.millis)),
- term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB")
+ term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " +
+ "start('w$) AS w$start, end('w$) AS w$end")
),
- term("select", "c, sumA, avgB")
+ term("select", "c, CAST(w$end) AS w$end, CAST(w$start) AS w$start, sumA, avgB")
)
util.verifySql(sqlQuery, expected)
@@ -171,7 +185,12 @@ class WindowAggregateTest extends TableTestBase {
util.addTable[(Int, Long, String, Int, Timestamp)]("T", 'a, 'b, 'c, 'd, 'ts)
val sqlQuery =
- "SELECT c, d, SUM(a) AS sumA, MIN(b) AS minB " +
+ "SELECT " +
+ " c, d, " +
+ " SESSION_START(ts, INTERVAL '12' HOUR), " +
+ " SESSION_END(ts, INTERVAL '12' HOUR), " +
+ " SUM(a) AS sumA, " +
+ " MIN(b) AS minB " +
"FROM T " +
"GROUP BY SESSION(ts, INTERVAL '12' HOUR), c, d"
@@ -183,9 +202,10 @@ class WindowAggregateTest extends TableTestBase {
batchTableNode(0),
term("groupBy", "c, d"),
term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 43200000.millis)),
- term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB")
+ term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " +
+ "start('w$) AS w$start, end('w$) AS w$end")
),
- term("select", "c, d, sumA, minB")
+ term("select", "c, d, CAST(w$start) AS w$start, CAST(w$end) AS w$end, sumA, minB")
)
util.verifySql(sqlQuery, expected)
http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index f4befa6..6f03bec 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -17,12 +17,10 @@
*/
package org.apache.flink.table.api.scala.stream.sql
-import java.sql.Timestamp
-
import org.apache.flink.api.scala._
import org.apache.flink.table.api.TableException
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeSessionGroupWindow, ProcessingTimeSlidingGroupWindow, ProcessingTimeTumblingGroupWindow}
+import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeSessionGroupWindow, ProcessingTimeSlidingGroupWindow}
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
import org.junit.Test
@@ -86,7 +84,14 @@ class WindowAggregateTest extends TableTestBase {
@Test
def testTumbleFunction() = {
- val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(rowtime(), INTERVAL '15' MINUTE)"
+
+ val sql =
+ "SELECT " +
+ " COUNT(*), " +
+ " TUMBLE_START(rowtime(), INTERVAL '15' MINUTE), " +
+ " TUMBLE_END(rowtime(), INTERVAL '15' MINUTE)" +
+ "FROM MyTable " +
+ "GROUP BY TUMBLE(rowtime(), INTERVAL '15' MINUTE)"
val expected =
unaryNode(
"DataStreamCalc",
@@ -98,17 +103,21 @@ class WindowAggregateTest extends TableTestBase {
term("select", "1970-01-01 00:00:00 AS $f0")
),
term("window", EventTimeTumblingGroupWindow(Some('w$), 'rowtime, 900000.millis)),
- term("select", "COUNT(*) AS EXPR$0")
+ term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end")
),
- term("select", "EXPR$0")
+ term("select", "EXPR$0, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
)
streamUtil.verifySql(sql, expected)
}
@Test
def testHoppingFunction() = {
- val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
- "HOP(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
+ val sql =
+ "SELECT COUNT(*), " +
+ " HOP_START(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR), " +
+ " HOP_END(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR) " +
+ "FROM MyTable " +
+ "GROUP BY HOP(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
val expected =
unaryNode(
"DataStreamCalc",
@@ -121,17 +130,22 @@ class WindowAggregateTest extends TableTestBase {
),
term("window", ProcessingTimeSlidingGroupWindow(Some('w$),
3600000.millis, 900000.millis)),
- term("select", "COUNT(*) AS EXPR$0")
+ term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end")
),
- term("select", "EXPR$0")
+ term("select", "EXPR$0, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
)
streamUtil.verifySql(sql, expected)
}
@Test
def testSessionFunction() = {
- val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
- "SESSION(proctime(), INTERVAL '15' MINUTE)"
+ val sql =
+ "SELECT " +
+ " COUNT(*), " +
+ " SESSION_START(proctime(), INTERVAL '15' MINUTE), " +
+ " SESSION_END(proctime(), INTERVAL '15' MINUTE) " +
+ "FROM MyTable " +
+ "GROUP BY SESSION(proctime(), INTERVAL '15' MINUTE)"
val expected =
unaryNode(
"DataStreamCalc",
@@ -143,9 +157,9 @@ class WindowAggregateTest extends TableTestBase {
term("select", "1970-01-01 00:00:00 AS $f0")
),
term("window", ProcessingTimeSessionGroupWindow(Some('w$), 900000.millis)),
- term("select", "COUNT(*) AS EXPR$0")
+ term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end")
),
- term("select", "EXPR$0")
+ term("select", "EXPR$0, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
)
streamUtil.verifySql(sql, expected)
}
[2/2] flink git commit: [FLINK-5545] [table] Remove
FlinkAggregateExpandDistinctAggregatesRule after bumping Calcite to v1.12.
Posted by fh...@apache.org.
[FLINK-5545] [table] Remove FlinkAggregateExpandDistinctAggregatesRule after bumping Calcite to v1.12.
This closes #3695.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0038da41
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0038da41
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0038da41
Branch: refs/heads/master
Commit: 0038da41553908a427dd20be75838cccb48c6bcf
Parents: fa7907a
Author: Kurt Young <yk...@gmail.com>
Authored: Fri Apr 7 17:46:06 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 7 14:09:56 2017 +0200
----------------------------------------------------------------------
...nkAggregateExpandDistinctAggregatesRule.java | 1158 ------------------
.../flink/table/plan/rules/FlinkRuleSets.scala | 8 +-
2 files changed, 3 insertions(+), 1163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0038da41/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java
deleted file mode 100644
index 9d4e08e..0000000
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java
+++ /dev/null
@@ -1,1158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.calcite.rules;
-
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.logical.LogicalAggregate;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.fun.SqlCountAggFunction;
-import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.fun.SqlSumAggFunction;
-import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.RelBuilder;
-import org.apache.calcite.tools.RelBuilderFactory;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.Util;
-
-import org.apache.flink.util.Preconditions;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-/**
- Copy calcite's AggregateExpandDistinctAggregatesRule to Flink project,
- and do a quick fix to avoid some bad case mentioned in CALCITE-1558.
- Should drop it and use calcite's AggregateExpandDistinctAggregatesRule
- when we upgrade to calcite 1.12(above)
- */
-
-/**
- * Planner rule that expands distinct aggregates
- * (such as {@code COUNT(DISTINCT x)}) from a
- * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
- *
- * <p>How this is done depends upon the arguments to the function. If all
- * functions have the same argument
- * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument
- * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} is
- * sufficient.
- *
- * <p>If there are multiple arguments
- * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)})
- * the rule creates separate {@code Aggregate}s and combines using a
- * {@link org.apache.calcite.rel.core.Join}.
- */
-public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
- //~ Static fields/initializers ---------------------------------------------
-
- /** The default instance of the rule; operates only on logical expressions. */
- public static final FlinkAggregateExpandDistinctAggregatesRule INSTANCE =
- new FlinkAggregateExpandDistinctAggregatesRule(LogicalAggregate.class, true,
- RelFactories.LOGICAL_BUILDER);
-
- /** Instance of the rule that operates only on logical expressions and
- * generates a join. */
- public static final FlinkAggregateExpandDistinctAggregatesRule JOIN =
- new FlinkAggregateExpandDistinctAggregatesRule(LogicalAggregate.class, false,
- RelFactories.LOGICAL_BUILDER);
-
- private static final BigDecimal TWO = BigDecimal.valueOf(2L);
-
- public final boolean useGroupingSets;
-
- //~ Constructors -----------------------------------------------------------
-
- public FlinkAggregateExpandDistinctAggregatesRule(
- Class<? extends LogicalAggregate> clazz,
- boolean useGroupingSets,
- RelBuilderFactory relBuilderFactory) {
- super(operand(clazz, any()), relBuilderFactory, null);
- this.useGroupingSets = useGroupingSets;
- }
-
- /**
- * @deprecated to be removed before 2.0
- */
- @Deprecated
- public FlinkAggregateExpandDistinctAggregatesRule(
- Class<? extends LogicalAggregate> clazz,
- boolean useGroupingSets,
- RelFactories.JoinFactory joinFactory) {
- this(clazz, useGroupingSets, RelBuilder.proto(Contexts.of(joinFactory)));
- }
-
- /**
- * @deprecated to be removed before 2.0
- */
- @Deprecated
- public FlinkAggregateExpandDistinctAggregatesRule(
- Class<? extends LogicalAggregate> clazz,
- RelFactories.JoinFactory joinFactory) {
- this(clazz, false, RelBuilder.proto(Contexts.of(joinFactory)));
- }
-
- //~ Methods ----------------------------------------------------------------
-
- public void onMatch(RelOptRuleCall call) {
- final Aggregate aggregate = call.rel(0);
- if (!aggregate.containsDistinctCall()) {
- return;
- }
-
- // Find all of the agg expressions. We use a LinkedHashSet to ensure
- // determinism.
- int nonDistinctCount = 0;
- int distinctCount = 0;
- int filterCount = 0;
- int unsupportedAggCount = 0;
- final Set<Pair<List<Integer>, Integer>> argLists = new LinkedHashSet<>();
- for (AggregateCall aggCall : aggregate.getAggCallList()) {
- if (aggCall.filterArg >= 0) {
- ++filterCount;
- }
- if (!aggCall.isDistinct()) {
- ++nonDistinctCount;
- if (!(aggCall.getAggregation() instanceof SqlCountAggFunction
- || aggCall.getAggregation() instanceof SqlSumAggFunction
- || aggCall.getAggregation() instanceof SqlMinMaxAggFunction)) {
- ++unsupportedAggCount;
- }
- continue;
- }
- ++distinctCount;
- argLists.add(Pair.of(aggCall.getArgList(), aggCall.filterArg));
- }
- Preconditions.checkState(argLists.size() > 0, "containsDistinctCall lied");
-
- // If all of the agg expressions are distinct and have the same
- // arguments then we can use a more efficient form.
- if (nonDistinctCount == 0 && argLists.size() == 1) {
- final Pair<List<Integer>, Integer> pair =
- Iterables.getOnlyElement(argLists);
- final RelBuilder relBuilder = call.builder();
- convertMonopole(relBuilder, aggregate, pair.left, pair.right);
- call.transformTo(relBuilder.build());
- return;
- }
-
- if (useGroupingSets) {
- rewriteUsingGroupingSets(call, aggregate, argLists);
- return;
- }
-
- // If only one distinct aggregate and one or more non-distinct aggregates,
- // we can generate multi-phase aggregates
- if (distinctCount == 1 // one distinct aggregate
- && filterCount == 0 // no filter
- && unsupportedAggCount == 0 // sum/min/max/count in non-distinct aggregate
- && nonDistinctCount > 0) { // one or more non-distinct aggregates
- final RelBuilder relBuilder = call.builder();
- convertSingletonDistinct(relBuilder, aggregate, argLists);
- call.transformTo(relBuilder.build());
- return;
- }
-
- // Create a list of the expressions which will yield the final result.
- // Initially, the expressions point to the input field.
- final List<RelDataTypeField> aggFields =
- aggregate.getRowType().getFieldList();
- final List<RexInputRef> refs = new ArrayList<>();
- final List<String> fieldNames = aggregate.getRowType().getFieldNames();
- final ImmutableBitSet groupSet = aggregate.getGroupSet();
- final int groupAndIndicatorCount =
- aggregate.getGroupCount() + aggregate.getIndicatorCount();
- for (int i : Util.range(groupAndIndicatorCount)) {
- refs.add(RexInputRef.of(i, aggFields));
- }
-
- // Aggregate the original relation, including any non-distinct aggregates.
- final List<AggregateCall> newAggCallList = new ArrayList<>();
- int i = -1;
- for (AggregateCall aggCall : aggregate.getAggCallList()) {
- ++i;
- if (aggCall.isDistinct()) {
- refs.add(null);
- continue;
- }
- refs.add(
- new RexInputRef(
- groupAndIndicatorCount + newAggCallList.size(),
- aggFields.get(groupAndIndicatorCount + i).getType()));
- newAggCallList.add(aggCall);
- }
-
- // In the case where there are no non-distinct aggregates (regardless of
- // whether there are group bys), there's no need to generate the
- // extra aggregate and join.
- final RelBuilder relBuilder = call.builder();
- relBuilder.push(aggregate.getInput());
- int n = 0;
- if (!newAggCallList.isEmpty()) {
- final RelBuilder.GroupKey groupKey =
- relBuilder.groupKey(groupSet, aggregate.indicator, aggregate.getGroupSets());
- relBuilder.aggregate(groupKey, newAggCallList);
- ++n;
- }
-
- // For each set of operands, find and rewrite all calls which have that
- // set of operands.
- for (Pair<List<Integer>, Integer> argList : argLists) {
- doRewrite(relBuilder, aggregate, n++, argList.left, argList.right, refs);
- }
-
- relBuilder.project(refs, fieldNames);
- call.transformTo(relBuilder.build());
- }
-
- /**
- * Converts an aggregate with one distinct aggregate and one or more
- * non-distinct aggregates to multi-phase aggregates (see reference example
- * below).
- *
- * @param relBuilder Contains the input relational expression
- * @param aggregate Original aggregate
- * @param argLists Arguments and filters to the distinct aggregate function
- *
- */
- private RelBuilder convertSingletonDistinct(RelBuilder relBuilder,
- Aggregate aggregate, Set<Pair<List<Integer>, Integer>> argLists) {
- // For example,
- // SELECT deptno, COUNT(*), SUM(bonus), MIN(DISTINCT sal)
- // FROM emp
- // GROUP BY deptno
- //
- // becomes
- //
- // SELECT deptno, SUM(cnt), SUM(bonus), MIN(sal)
- // FROM (
- // SELECT deptno, COUNT(*) as cnt, SUM(bonus), sal
- // FROM EMP
- // GROUP BY deptno, sal) // Aggregate B
- // GROUP BY deptno // Aggregate A
- relBuilder.push(aggregate.getInput());
- final List<Pair<RexNode, String>> projects = new ArrayList<>();
- final Map<Integer, Integer> sourceOf = new HashMap<>();
- SortedSet<Integer> newGroupSet = new TreeSet<>();
- final List<RelDataTypeField> childFields =
- relBuilder.peek().getRowType().getFieldList();
- final boolean hasGroupBy = aggregate.getGroupSet().size() > 0;
-
- SortedSet<Integer> groupSet = new TreeSet<>(aggregate.getGroupSet().asList());
-
- // Add the distinct aggregate column(s) to the group-by columns,
- // if not already a part of the group-by
- newGroupSet.addAll(aggregate.getGroupSet().asList());
- for (Pair<List<Integer>, Integer> argList : argLists) {
- newGroupSet.addAll(argList.getKey());
- }
-
- // Re-map the arguments to the aggregate A. These arguments will get
- // remapped because of the intermediate aggregate B generated as part of the
- // transformation.
- for (int arg : newGroupSet) {
- sourceOf.put(arg, projects.size());
- projects.add(RexInputRef.of2(arg, childFields));
- }
- // Generate the intermediate aggregate B
- final List<AggregateCall> aggCalls = aggregate.getAggCallList();
- final List<AggregateCall> newAggCalls = new ArrayList<>();
- final List<Integer> fakeArgs = new ArrayList<>();
- final Map<AggregateCall, Integer> callArgMap = new HashMap<>();
- // First identify the real arguments, then use the rest for fake arguments
- // e.g. if real arguments are 0, 1, 3. Then the fake arguments will be 2, 4
- for (final AggregateCall aggCall : aggCalls) {
- if (!aggCall.isDistinct()) {
- for (int arg : aggCall.getArgList()) {
- if (!groupSet.contains(arg)) {
- sourceOf.put(arg, projects.size());
- }
- }
- }
- }
- int fakeArg0 = 0;
- for (final AggregateCall aggCall : aggCalls) {
- // We will deal with non-distinct aggregates below
- if (!aggCall.isDistinct()) {
- boolean isGroupKeyUsedInAgg = false;
- for (int arg : aggCall.getArgList()) {
- if (groupSet.contains(arg)) {
- isGroupKeyUsedInAgg = true;
- break;
- }
- }
- if (aggCall.getArgList().size() == 0 || isGroupKeyUsedInAgg) {
- while (sourceOf.get(fakeArg0) != null) {
- ++fakeArg0;
- }
- fakeArgs.add(fakeArg0);
- ++fakeArg0;
- }
- }
- }
- for (final AggregateCall aggCall : aggCalls) {
- if (!aggCall.isDistinct()) {
- for (int arg : aggCall.getArgList()) {
- if (!groupSet.contains(arg)) {
- sourceOf.remove(arg);
- }
- }
- }
- }
- // Compute the remapped arguments using fake arguments for non-distinct
- // aggregates with no arguments e.g. count(*).
- int fakeArgIdx = 0;
- for (final AggregateCall aggCall : aggCalls) {
- // Project the column corresponding to the distinct aggregate. Project
- // as-is all the non-distinct aggregates
- if (!aggCall.isDistinct()) {
- final AggregateCall newCall =
- AggregateCall.create(aggCall.getAggregation(), false,
- aggCall.getArgList(), -1,
- ImmutableBitSet.of(newGroupSet).cardinality(),
- relBuilder.peek(), null, aggCall.name);
- newAggCalls.add(newCall);
- if (newCall.getArgList().size() == 0) {
- int fakeArg = fakeArgs.get(fakeArgIdx);
- callArgMap.put(newCall, fakeArg);
- sourceOf.put(fakeArg, projects.size());
- projects.add(
- Pair.of((RexNode) new RexInputRef(fakeArg, newCall.getType()),
- newCall.getName()));
- ++fakeArgIdx;
- } else {
- for (int arg : newCall.getArgList()) {
- if (groupSet.contains(arg)) {
- int fakeArg = fakeArgs.get(fakeArgIdx);
- callArgMap.put(newCall, fakeArg);
- sourceOf.put(fakeArg, projects.size());
- projects.add(
- Pair.of((RexNode) new RexInputRef(fakeArg, newCall.getType()),
- newCall.getName()));
- ++fakeArgIdx;
- } else {
- sourceOf.put(arg, projects.size());
- projects.add(
- Pair.of((RexNode) new RexInputRef(arg, newCall.getType()),
- newCall.getName()));
- }
- }
- }
- }
- }
- // Generate the aggregate B (see the reference example above)
- relBuilder.push(
- aggregate.copy(
- aggregate.getTraitSet(), relBuilder.build(),
- false, ImmutableBitSet.of(newGroupSet), null, newAggCalls));
- // Convert the existing aggregate to aggregate A (see the reference example above)
- final List<AggregateCall> newTopAggCalls =
- Lists.newArrayList(aggregate.getAggCallList());
- // Use the remapped arguments for the (non)distinct aggregate calls
- for (int i = 0; i < newTopAggCalls.size(); i++) {
- // Re-map arguments.
- final AggregateCall aggCall = newTopAggCalls.get(i);
- final int argCount = aggCall.getArgList().size();
- final List<Integer> newArgs = new ArrayList<>(argCount);
- final AggregateCall newCall;
-
-
- for (int j = 0; j < argCount; j++) {
- final Integer arg = aggCall.getArgList().get(j);
- if (callArgMap.containsKey(aggCall)) {
- newArgs.add(sourceOf.get(callArgMap.get(aggCall)));
- }
- else {
- newArgs.add(sourceOf.get(arg));
- }
- }
- if (aggCall.isDistinct()) {
- newCall =
- AggregateCall.create(aggCall.getAggregation(), false, newArgs,
- -1, aggregate.getGroupSet().cardinality(), relBuilder.peek(),
- aggCall.getType(), aggCall.name);
- } else {
- // If aggregate B had a COUNT aggregate call the corresponding aggregate at
- // aggregate A must be SUM. For other aggregates, it remains the same.
- if (aggCall.getAggregation() instanceof SqlCountAggFunction) {
- if (aggCall.getArgList().size() == 0) {
- newArgs.add(sourceOf.get(callArgMap.get(aggCall)));
- }
- if (hasGroupBy) {
- SqlSumAggFunction sumAgg = new SqlSumAggFunction(null);
- newCall =
- AggregateCall.create(sumAgg, false, newArgs, -1,
- aggregate.getGroupSet().cardinality(), relBuilder.peek(),
- aggCall.getType(), aggCall.getName());
- } else {
- SqlSumEmptyIsZeroAggFunction sumAgg = new SqlSumEmptyIsZeroAggFunction();
- newCall =
- AggregateCall.create(sumAgg, false, newArgs, -1,
- aggregate.getGroupSet().cardinality(), relBuilder.peek(),
- aggCall.getType(), aggCall.getName());
- }
- } else {
- newCall =
- AggregateCall.create(aggCall.getAggregation(), false, newArgs, -1,
- aggregate.getGroupSet().cardinality(),
- relBuilder.peek(), aggCall.getType(), aggCall.name);
- }
- }
- newTopAggCalls.set(i, newCall);
- }
- // Populate the group-by keys with the remapped arguments for aggregate A
- newGroupSet.clear();
- for (int arg : aggregate.getGroupSet()) {
- newGroupSet.add(sourceOf.get(arg));
- }
- relBuilder.push(
- aggregate.copy(aggregate.getTraitSet(),
- relBuilder.build(), aggregate.indicator,
- ImmutableBitSet.of(newGroupSet), null, newTopAggCalls));
- return relBuilder;
- }
- /*
- public RelBuilder convertSingletonDistinct(RelBuilder relBuilder,
- Aggregate aggregate, Set<Pair<List<Integer>, Integer>> argLists) {
- // For example,
- // SELECT deptno, COUNT(*), SUM(bonus), MIN(DISTINCT sal)
- // FROM emp
- // GROUP BY deptno
- //
- // becomes
- //
- // SELECT deptno, SUM(cnt), SUM(bonus), MIN(sal)
- // FROM (
- // SELECT deptno, COUNT(*) as cnt, SUM(bonus), sal
- // FROM EMP
- // GROUP BY deptno, sal) // Aggregate B
- // GROUP BY deptno // Aggregate A
- relBuilder.push(aggregate.getInput());
- final List<Pair<RexNode, String>> projects = new ArrayList<>();
- final Map<Integer, Integer> sourceOf = new HashMap<>();
- SortedSet<Integer> newGroupSet = new TreeSet<>();
- final List<RelDataTypeField> childFields =
- relBuilder.peek().getRowType().getFieldList();
- final boolean hasGroupBy = aggregate.getGroupSet().size() > 0;
-
- // Add the distinct aggregate column(s) to the group-by columns,
- // if not already a part of the group-by
- newGroupSet.addAll(aggregate.getGroupSet().asList());
- for (Pair<List<Integer>, Integer> argList : argLists) {
- newGroupSet.addAll(argList.getKey());
- }
-
- // Re-map the arguments to the aggregate A. These arguments will get
- // remapped because of the intermediate aggregate B generated as part of the
- // transformation.
- for (int arg : newGroupSet) {
- sourceOf.put(arg, projects.size());
- projects.add(RexInputRef.of2(arg, childFields));
- }
- // Generate the intermediate aggregate B
- final List<AggregateCall> aggCalls = aggregate.getAggCallList();
- final List<AggregateCall> newAggCalls = new ArrayList<>();
- final List<Integer> fakeArgs = new ArrayList<>();
- final Map<AggregateCall, Integer> callArgMap = new HashMap<>();
- // First identify the real arguments, then use the rest for fake arguments
- // e.g. if real arguments are 0, 1, 3. Then the fake arguments will be 2, 4
- for (final AggregateCall aggCall : aggCalls) {
- if (!aggCall.isDistinct()) {
- for (int arg : aggCall.getArgList()) {
- if (!sourceOf.containsKey(arg)) {
- sourceOf.put(arg, projects.size());
- }
- }
- }
- }
- int fakeArg0 = 0;
- for (final AggregateCall aggCall : aggCalls) {
- // We will deal with non-distinct aggregates below
- if (!aggCall.isDistinct()) {
- boolean isGroupKeyUsedInAgg = false;
- for (int arg : aggCall.getArgList()) {
- if (sourceOf.containsKey(arg)) {
- isGroupKeyUsedInAgg = true;
- break;
- }
- }
- if (aggCall.getArgList().size() == 0 || isGroupKeyUsedInAgg) {
- while (sourceOf.get(fakeArg0) != null) {
- ++fakeArg0;
- }
- fakeArgs.add(fakeArg0);
- }
- }
- }
- for (final AggregateCall aggCall : aggCalls) {
- if (!aggCall.isDistinct()) {
- for (int arg : aggCall.getArgList()) {
- if (!sourceOf.containsKey(arg)) {
- sourceOf.remove(arg);
- }
- }
- }
- }
- // Compute the remapped arguments using fake arguments for non-distinct
- // aggregates with no arguments e.g. count(*).
- int fakeArgIdx = 0;
- for (final AggregateCall aggCall : aggCalls) {
- // Project the column corresponding to the distinct aggregate. Project
- // as-is all the non-distinct aggregates
- if (!aggCall.isDistinct()) {
- final AggregateCall newCall =
- AggregateCall.create(aggCall.getAggregation(), false,
- aggCall.getArgList(), -1,
- ImmutableBitSet.of(newGroupSet).cardinality(),
- relBuilder.peek(), null, aggCall.name);
- newAggCalls.add(newCall);
- if (newCall.getArgList().size() == 0) {
- int fakeArg = fakeArgs.get(fakeArgIdx);
- callArgMap.put(newCall, fakeArg);
- sourceOf.put(fakeArg, projects.size());
- projects.add(
- Pair.of((RexNode) new RexInputRef(fakeArg, newCall.getType()),
- newCall.getName()));
- ++fakeArgIdx;
- } else {
- for (int arg : newCall.getArgList()) {
- if (sourceOf.containsKey(arg)) {
- int fakeArg = fakeArgs.get(fakeArgIdx);
- callArgMap.put(newCall, fakeArg);
- sourceOf.put(fakeArg, projects.size());
- projects.add(
- Pair.of((RexNode) new RexInputRef(fakeArg, newCall.getType()),
- newCall.getName()));
- ++fakeArgIdx;
- } else {
- sourceOf.put(arg, projects.size());
- projects.add(
- Pair.of((RexNode) new RexInputRef(arg, newCall.getType()),
- newCall.getName()));
- }
- }
- }
- }
- }
- // Generate the aggregate B (see the reference example above)
- relBuilder.push(
- aggregate.copy(
- aggregate.getTraitSet(), relBuilder.build(),
- false, ImmutableBitSet.of(newGroupSet), null, newAggCalls));
- // Convert the existing aggregate to aggregate A (see the reference example above)
- final List<AggregateCall> newTopAggCalls =
- Lists.newArrayList(aggregate.getAggCallList());
- // Use the remapped arguments for the (non)distinct aggregate calls
- for (int i = 0; i < newTopAggCalls.size(); i++) {
- // Re-map arguments.
- final AggregateCall aggCall = newTopAggCalls.get(i);
- final int argCount = aggCall.getArgList().size();
- final List<Integer> newArgs = new ArrayList<>(argCount);
- final AggregateCall newCall;
-
-
- for (int j = 0; j < argCount; j++) {
- final Integer arg = aggCall.getArgList().get(j);
- if (callArgMap.containsKey(aggCall)) {
- newArgs.add(sourceOf.get(callArgMap.get(aggCall)));
- }
- else {
- newArgs.add(sourceOf.get(arg));
- }
- }
- if (aggCall.isDistinct()) {
- newCall =
- AggregateCall.create(aggCall.getAggregation(), false, newArgs,
- -1, aggregate.getGroupSet().cardinality(), relBuilder.peek(),
- aggCall.getType(), aggCall.name);
- } else {
- // If aggregate B had a COUNT aggregate call the corresponding aggregate at
- // aggregate A must be SUM. For other aggregates, it remains the same.
- if (aggCall.getAggregation() instanceof SqlCountAggFunction) {
- if (aggCall.getArgList().size() == 0) {
- newArgs.add(sourceOf.get(callArgMap.get(aggCall)));
- }
- if (hasGroupBy) {
- SqlSumAggFunction sumAgg = new SqlSumAggFunction(null);
- newCall =
- AggregateCall.create(sumAgg, false, newArgs, -1,
- aggregate.getGroupSet().cardinality(), relBuilder.peek(),
- aggCall.getType(), aggCall.getName());
- } else {
- SqlSumEmptyIsZeroAggFunction sumAgg = new SqlSumEmptyIsZeroAggFunction();
- newCall =
- AggregateCall.create(sumAgg, false, newArgs, -1,
- aggregate.getGroupSet().cardinality(), relBuilder.peek(),
- aggCall.getType(), aggCall.getName());
- }
- } else {
- newCall =
- AggregateCall.create(aggCall.getAggregation(), false, newArgs, -1,
- aggregate.getGroupSet().cardinality(),
- relBuilder.peek(), aggCall.getType(), aggCall.name);
- }
- }
- newTopAggCalls.set(i, newCall);
- }
- // Populate the group-by keys with the remapped arguments for aggregate A
- newGroupSet.clear();
- for (int arg : aggregate.getGroupSet()) {
- newGroupSet.add(sourceOf.get(arg));
- }
- relBuilder.push(
- aggregate.copy(aggregate.getTraitSet(),
- relBuilder.build(), aggregate.indicator,
- ImmutableBitSet.of(newGroupSet), null, newTopAggCalls));
- return relBuilder;
- }
- */
-
- @SuppressWarnings("DanglingJavadoc")
- private void rewriteUsingGroupingSets(RelOptRuleCall call,
- Aggregate aggregate, Set<Pair<List<Integer>, Integer>> argLists) {
- final Set<ImmutableBitSet> groupSetTreeSet =
- new TreeSet<>(ImmutableBitSet.ORDERING);
- groupSetTreeSet.add(aggregate.getGroupSet());
- for (Pair<List<Integer>, Integer> argList : argLists) {
- groupSetTreeSet.add(
- ImmutableBitSet.of(argList.left)
- .setIf(argList.right, argList.right >= 0)
- .union(aggregate.getGroupSet()));
- }
-
- final ImmutableList<ImmutableBitSet> groupSets =
- ImmutableList.copyOf(groupSetTreeSet);
- final ImmutableBitSet fullGroupSet = ImmutableBitSet.union(groupSets);
-
- final List<AggregateCall> distinctAggCalls = new ArrayList<>();
- for (Pair<AggregateCall, String> aggCall : aggregate.getNamedAggCalls()) {
- if (!aggCall.left.isDistinct()) {
- distinctAggCalls.add(aggCall.left.rename(aggCall.right));
- }
- }
-
- final RelBuilder relBuilder = call.builder();
- relBuilder.push(aggregate.getInput());
- relBuilder.aggregate(relBuilder.groupKey(fullGroupSet, groupSets.size() > 1, groupSets),
- distinctAggCalls);
- final RelNode distinct = relBuilder.peek();
- final int groupCount = fullGroupSet.cardinality();
- final int indicatorCount = groupSets.size() > 1 ? groupCount : 0;
-
- final RelOptCluster cluster = aggregate.getCluster();
- final RexBuilder rexBuilder = cluster.getRexBuilder();
- final RelDataTypeFactory typeFactory = cluster.getTypeFactory();
- final RelDataType booleanType =
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.BOOLEAN), false);
- final List<Pair<RexNode, String>> predicates = new ArrayList<>();
- final Map<ImmutableBitSet, Integer> filters = new HashMap<>();
-
- /** Function to register a filter for a group set. */
- class Registrar {
- RexNode group = null;
-
- private int register(ImmutableBitSet groupSet) {
- if (group == null) {
- group = makeGroup(groupCount - 1);
- }
- final RexNode node =
- rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, group,
- rexBuilder.makeExactLiteral(
- toNumber(remap(fullGroupSet, groupSet))));
- predicates.add(Pair.of(node, toString(groupSet)));
- return groupCount + indicatorCount + distinctAggCalls.size()
- + predicates.size() - 1;
- }
-
- private RexNode makeGroup(int i) {
- final RexInputRef ref =
- rexBuilder.makeInputRef(booleanType, groupCount + i);
- final RexNode kase =
- rexBuilder.makeCall(SqlStdOperatorTable.CASE, ref,
- rexBuilder.makeExactLiteral(BigDecimal.ZERO),
- rexBuilder.makeExactLiteral(TWO.pow(i)));
- if (i == 0) {
- return kase;
- } else {
- return rexBuilder.makeCall(SqlStdOperatorTable.PLUS,
- makeGroup(i - 1), kase);
- }
- }
-
- private BigDecimal toNumber(ImmutableBitSet bitSet) {
- BigDecimal n = BigDecimal.ZERO;
- for (int key : bitSet) {
- n = n.add(TWO.pow(key));
- }
- return n;
- }
-
- private String toString(ImmutableBitSet bitSet) {
- final StringBuilder buf = new StringBuilder("$i");
- for (int key : bitSet) {
- buf.append(key).append('_');
- }
- return buf.substring(0, buf.length() - 1);
- }
- }
- final Registrar registrar = new Registrar();
- for (ImmutableBitSet groupSet : groupSets) {
- filters.put(groupSet, registrar.register(groupSet));
- }
-
- if (!predicates.isEmpty()) {
- List<Pair<RexNode, String>> nodes = new ArrayList<>();
- for (RelDataTypeField f : relBuilder.peek().getRowType().getFieldList()) {
- final RexNode node = rexBuilder.makeInputRef(f.getType(), f.getIndex());
- nodes.add(Pair.of(node, f.getName()));
- }
- nodes.addAll(predicates);
- relBuilder.project(Pair.left(nodes), Pair.right(nodes));
- }
-
- int x = groupCount + indicatorCount;
- final List<AggregateCall> newCalls = new ArrayList<>();
- for (AggregateCall aggCall : aggregate.getAggCallList()) {
- final int newFilterArg;
- final List<Integer> newArgList;
- final SqlAggFunction aggregation;
- if (!aggCall.isDistinct()) {
- aggregation = SqlStdOperatorTable.MIN;
- newArgList = ImmutableIntList.of(x++);
- newFilterArg = filters.get(aggregate.getGroupSet());
- } else {
- aggregation = aggCall.getAggregation();
- newArgList = remap(fullGroupSet, aggCall.getArgList());
- newFilterArg =
- filters.get(
- ImmutableBitSet.of(aggCall.getArgList())
- .setIf(aggCall.filterArg, aggCall.filterArg >= 0)
- .union(aggregate.getGroupSet()));
- }
- final AggregateCall newCall =
- AggregateCall.create(aggregation, false, newArgList, newFilterArg,
- aggregate.getGroupCount(), distinct, null, aggCall.name);
- newCalls.add(newCall);
- }
-
- relBuilder.aggregate(
- relBuilder.groupKey(
- remap(fullGroupSet, aggregate.getGroupSet()),
- aggregate.indicator,
- remap(fullGroupSet, aggregate.getGroupSets())),
- newCalls);
- relBuilder.convert(aggregate.getRowType(), true);
- call.transformTo(relBuilder.build());
- }
-
- private static ImmutableBitSet remap(ImmutableBitSet groupSet,
- ImmutableBitSet bitSet) {
- final ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
- for (Integer bit : bitSet) {
- builder.set(remap(groupSet, bit));
- }
- return builder.build();
- }
-
- private static ImmutableList<ImmutableBitSet> remap(ImmutableBitSet groupSet,
- Iterable<ImmutableBitSet> bitSets) {
- final ImmutableList.Builder<ImmutableBitSet> builder =
- ImmutableList.builder();
- for (ImmutableBitSet bitSet : bitSets) {
- builder.add(remap(groupSet, bitSet));
- }
- return builder.build();
- }
-
- private static List<Integer> remap(ImmutableBitSet groupSet,
- List<Integer> argList) {
- ImmutableIntList list = ImmutableIntList.of();
- for (int arg : argList) {
- list = list.append(remap(groupSet, arg));
- }
- return list;
- }
-
- private static int remap(ImmutableBitSet groupSet, int arg) {
- return arg < 0 ? -1 : groupSet.indexOf(arg);
- }
-
- /**
- * Converts an aggregate relational expression that contains just one
- * distinct aggregate function (or perhaps several over the same arguments)
- * and no non-distinct aggregate functions.
- */
- private RelBuilder convertMonopole(RelBuilder relBuilder, Aggregate aggregate,
- List<Integer> argList, int filterArg) {
- // For example,
- // SELECT deptno, COUNT(DISTINCT sal), SUM(DISTINCT sal)
- // FROM emp
- // GROUP BY deptno
- //
- // becomes
- //
- // SELECT deptno, COUNT(distinct_sal), SUM(distinct_sal)
- // FROM (
- // SELECT DISTINCT deptno, sal AS distinct_sal
- // FROM EMP GROUP BY deptno)
- // GROUP BY deptno
-
- // Project the columns of the GROUP BY plus the arguments
- // to the agg function.
- final Map<Integer, Integer> sourceOf = new HashMap<>();
- createSelectDistinct(relBuilder, aggregate, argList, filterArg, sourceOf);
-
- // Create an aggregate on top, with the new aggregate list.
- final List<AggregateCall> newAggCalls =
- Lists.newArrayList(aggregate.getAggCallList());
- rewriteAggCalls(newAggCalls, argList, sourceOf);
- final int cardinality = aggregate.getGroupSet().cardinality();
- relBuilder.push(
- aggregate.copy(aggregate.getTraitSet(), relBuilder.build(),
- aggregate.indicator, ImmutableBitSet.range(cardinality), null,
- newAggCalls));
- return relBuilder;
- }
-
- /**
- * Converts all distinct aggregate calls to a given set of arguments.
- *
- * <p>This method is called several times, one for each set of arguments.
- * Each time it is called, it generates a JOIN to a new SELECT DISTINCT
- * relational expression, and modifies the set of top-level calls.
- *
- * @param aggregate Original aggregate
- * @param n Ordinal of this in a join. {@code relBuilder} contains the
- * input relational expression (either the original
- * aggregate, the output from the previous call to this
- * method. {@code n} is 0 if we're converting the
- * first distinct aggregate in a query with no non-distinct
- * aggregates)
- * @param argList Arguments to the distinct aggregate function
- * @param filterArg Argument that filters input to aggregate function, or -1
- * @param refs Array of expressions which will be the projected by the
- * result of this rule. Those relating to this arg list will
- * be modified @return Relational expression
- */
- private void doRewrite(RelBuilder relBuilder, Aggregate aggregate, int n,
- List<Integer> argList, int filterArg, List<RexInputRef> refs) {
- final RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder();
- final List<RelDataTypeField> leftFields;
- if (n == 0) {
- leftFields = null;
- } else {
- leftFields = relBuilder.peek().getRowType().getFieldList();
- }
-
- // LogicalAggregate(
- // child,
- // {COUNT(DISTINCT 1), SUM(DISTINCT 1), SUM(2)})
- //
- // becomes
- //
- // LogicalAggregate(
- // LogicalJoin(
- // child,
- // LogicalAggregate(child, < all columns > {}),
- // INNER,
- // <f2 = f5>))
- //
- // E.g.
- // SELECT deptno, SUM(DISTINCT sal), COUNT(DISTINCT gender), MAX(age)
- // FROM Emps
- // GROUP BY deptno
- //
- // becomes
- //
- // SELECT e.deptno, adsal.sum_sal, adgender.count_gender, e.max_age
- // FROM (
- // SELECT deptno, MAX(age) as max_age
- // FROM Emps GROUP BY deptno) AS e
- // JOIN (
- // SELECT deptno, COUNT(gender) AS count_gender FROM (
- // SELECT DISTINCT deptno, gender FROM Emps) AS dgender
- // GROUP BY deptno) AS adgender
- // ON e.deptno = adgender.deptno
- // JOIN (
- // SELECT deptno, SUM(sal) AS sum_sal FROM (
- // SELECT DISTINCT deptno, sal FROM Emps) AS dsal
- // GROUP BY deptno) AS adsal
- // ON e.deptno = adsal.deptno
- // GROUP BY e.deptno
- //
- // Note that if a query contains no non-distinct aggregates, then the
- // very first join/group by is omitted. In the example above, if
- // MAX(age) is removed, then the sub-select of "e" is not needed, and
- // instead the two other group by's are joined to one another.
-
- // Project the columns of the GROUP BY plus the arguments
- // to the agg function.
- final Map<Integer, Integer> sourceOf = new HashMap<>();
- createSelectDistinct(relBuilder, aggregate, argList, filterArg, sourceOf);
-
- // Now compute the aggregate functions on top of the distinct dataset.
- // Each distinct agg becomes a non-distinct call to the corresponding
- // field from the right; for example,
- // "COUNT(DISTINCT e.sal)"
- // becomes
- // "COUNT(distinct_e.sal)".
- final List<AggregateCall> aggCallList = new ArrayList<>();
- final List<AggregateCall> aggCalls = aggregate.getAggCallList();
-
- final int groupAndIndicatorCount =
- aggregate.getGroupCount() + aggregate.getIndicatorCount();
- int i = groupAndIndicatorCount - 1;
- for (AggregateCall aggCall : aggCalls) {
- ++i;
-
- // Ignore agg calls which are not distinct or have the wrong set
- // arguments. If we're rewriting aggs whose args are {sal}, we will
- // rewrite COUNT(DISTINCT sal) and SUM(DISTINCT sal) but ignore
- // COUNT(DISTINCT gender) or SUM(sal).
- if (!aggCall.isDistinct()) {
- continue;
- }
- if (!aggCall.getArgList().equals(argList)) {
- continue;
- }
-
- // Re-map arguments.
- final int argCount = aggCall.getArgList().size();
- final List<Integer> newArgs = new ArrayList<>(argCount);
- for (int j = 0; j < argCount; j++) {
- final Integer arg = aggCall.getArgList().get(j);
- newArgs.add(sourceOf.get(arg));
- }
- final int newFilterArg =
- aggCall.filterArg >= 0 ? sourceOf.get(aggCall.filterArg) : -1;
- final AggregateCall newAggCall =
- AggregateCall.create(aggCall.getAggregation(), false, newArgs,
- newFilterArg, aggCall.getType(), aggCall.getName());
- assert refs.get(i) == null;
- if (n == 0) {
- refs.set(i,
- new RexInputRef(groupAndIndicatorCount + aggCallList.size(),
- newAggCall.getType()));
- } else {
- refs.set(i,
- new RexInputRef(leftFields.size() + groupAndIndicatorCount
- + aggCallList.size(), newAggCall.getType()));
- }
- aggCallList.add(newAggCall);
- }
-
- final Map<Integer, Integer> map = new HashMap<>();
- for (Integer key : aggregate.getGroupSet()) {
- map.put(key, map.size());
- }
- final ImmutableBitSet newGroupSet = aggregate.getGroupSet().permute(map);
- assert newGroupSet
- .equals(ImmutableBitSet.range(aggregate.getGroupSet().cardinality()));
- ImmutableList<ImmutableBitSet> newGroupingSets = null;
- if (aggregate.indicator) {
- newGroupingSets =
- ImmutableBitSet.ORDERING.immutableSortedCopy(
- ImmutableBitSet.permute(aggregate.getGroupSets(), map));
- }
-
- relBuilder.push(
- aggregate.copy(aggregate.getTraitSet(), relBuilder.build(),
- aggregate.indicator, newGroupSet, newGroupingSets, aggCallList));
-
- // If there's no left child yet, no need to create the join
- if (n == 0) {
- return;
- }
-
- // Create the join condition. It is of the form
- // 'left.f0 = right.f0 and left.f1 = right.f1 and ...'
- // where {f0, f1, ...} are the GROUP BY fields.
- final List<RelDataTypeField> distinctFields =
- relBuilder.peek().getRowType().getFieldList();
- final List<RexNode> conditions = Lists.newArrayList();
- for (i = 0; i < groupAndIndicatorCount; ++i) {
- // null values form its own group
- // use "is not distinct from" so that the join condition
- // allows null values to match.
- conditions.add(
- rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_DISTINCT_FROM,
- RexInputRef.of(i, leftFields),
- new RexInputRef(leftFields.size() + i,
- distinctFields.get(i).getType())));
- }
-
- // Join in the new 'select distinct' relation.
- relBuilder.join(JoinRelType.INNER, conditions);
- }
-
- private static void rewriteAggCalls(
- List<AggregateCall> newAggCalls,
- List<Integer> argList,
- Map<Integer, Integer> sourceOf) {
- // Rewrite the agg calls. Each distinct agg becomes a non-distinct call
- // to the corresponding field from the right; for example,
- // "COUNT(DISTINCT e.sal)" becomes "COUNT(distinct_e.sal)".
- for (int i = 0; i < newAggCalls.size(); i++) {
- final AggregateCall aggCall = newAggCalls.get(i);
-
- // Ignore agg calls which are not distinct or have the wrong set
- // arguments. If we're rewriting aggregates whose args are {sal}, we will
- // rewrite COUNT(DISTINCT sal) and SUM(DISTINCT sal) but ignore
- // COUNT(DISTINCT gender) or SUM(sal).
- if (!aggCall.isDistinct()) {
- continue;
- }
- if (!aggCall.getArgList().equals(argList)) {
- continue;
- }
-
- // Re-map arguments.
- final int argCount = aggCall.getArgList().size();
- final List<Integer> newArgs = new ArrayList<>(argCount);
- for (int j = 0; j < argCount; j++) {
- final Integer arg = aggCall.getArgList().get(j);
- newArgs.add(sourceOf.get(arg));
- }
- final AggregateCall newAggCall =
- AggregateCall.create(aggCall.getAggregation(), false, newArgs, -1,
- aggCall.getType(), aggCall.getName());
- newAggCalls.set(i, newAggCall);
- }
- }
-
- /**
- * Given an {@link org.apache.calcite.rel.logical.LogicalAggregate}
- * and the ordinals of the arguments to a
- * particular call to an aggregate function, creates a 'select distinct'
- * relational expression which projects the group columns and those
- * arguments but nothing else.
- *
- * <p>For example, given
- *
- * <blockquote>
- * <pre>select f0, count(distinct f1), count(distinct f2)
- * from t group by f0</pre>
- * </blockquote>
- *
- * and the argument list
- *
- * <blockquote>{2}</blockquote>
- *
- * returns
- *
- * <blockquote>
- * <pre>select distinct f0, f2 from t</pre>
- * </blockquote>
- *
- * '
- *
- * <p>The <code>sourceOf</code> map is populated with the source of each
- * column; in this case sourceOf.get(0) = 0, and sourceOf.get(1) = 2.</p>
- *
- * @param relBuilder Relational expression builder
- * @param aggregate Aggregate relational expression
- * @param argList Ordinals of columns to make distinct
- * @param filterArg Ordinal of column to filter on, or -1
- * @param sourceOf Out parameter, is populated with a map of where each
- * output field came from
- * @return Aggregate relational expression which projects the required
- * columns
- */
- private RelBuilder createSelectDistinct(RelBuilder relBuilder,
- Aggregate aggregate, List<Integer> argList, int filterArg,
- Map<Integer, Integer> sourceOf) {
- relBuilder.push(aggregate.getInput());
- final List<Pair<RexNode, String>> projects = new ArrayList<>();
- final List<RelDataTypeField> childFields =
- relBuilder.peek().getRowType().getFieldList();
- for (int i : aggregate.getGroupSet()) {
- sourceOf.put(i, projects.size());
- projects.add(RexInputRef.of2(i, childFields));
- }
- for (Integer arg : argList) {
- if (filterArg >= 0) {
- // Implement
- // agg(DISTINCT arg) FILTER $f
- // by generating
- // SELECT DISTINCT ... CASE WHEN $f THEN arg ELSE NULL END AS arg
- // and then applying
- // agg(arg)
- // as usual.
- //
- // It works except for (rare) agg functions that need to see null
- // values.
- final RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder();
- final RexInputRef filterRef = RexInputRef.of(filterArg, childFields);
- final Pair<RexNode, String> argRef = RexInputRef.of2(arg, childFields);
- RexNode condition =
- rexBuilder.makeCall(SqlStdOperatorTable.CASE, filterRef,
- argRef.left,
- rexBuilder.ensureType(argRef.left.getType(),
- rexBuilder.constantNull(), true));
- sourceOf.put(arg, projects.size());
- projects.add(Pair.of(condition, "i$" + argRef.right));
- continue;
- }
- if (sourceOf.get(arg) != null) {
- continue;
- }
- sourceOf.put(arg, projects.size());
- projects.add(RexInputRef.of2(arg, childFields));
- }
- relBuilder.project(Pair.left(projects), Pair.right(projects));
-
- // Get the distinct values of the GROUP BY fields and the arguments
- // to the agg functions.
- relBuilder.push(
- aggregate.copy(aggregate.getTraitSet(), relBuilder.build(), false,
- ImmutableBitSet.range(projects.size()),
- null, ImmutableList.<AggregateCall>of()));
- return relBuilder;
- }
-}
-
-// End AggregateExpandDistinctAggregatesRule.java
http://git-wip-us.apache.org/repos/asf/flink/blob/0038da41/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index ca55473..41f095f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -20,8 +20,7 @@ package org.apache.flink.table.plan.rules
import org.apache.calcite.rel.rules._
import org.apache.calcite.tools.{RuleSet, RuleSets}
-import org.apache.flink.table.calcite.rules.FlinkAggregateExpandDistinctAggregatesRule
-import org.apache.flink.table.plan.rules.common.WindowStartEndPropertiesRule
+import org.apache.flink.table.plan.rules.common._
import org.apache.flink.table.plan.rules.dataSet._
import org.apache.flink.table.plan.rules.datastream._
@@ -87,6 +86,8 @@ object FlinkRuleSets {
AggregateJoinTransposeRule.EXTENDED,
// aggregate union rule
AggregateUnionAggregateRule.INSTANCE,
+ // expand distinct aggregate to normal aggregate with groupby
+ AggregateExpandDistinctAggregatesRule.JOIN,
// remove unnecessary sort rule
SortRemoveRule.INSTANCE,
@@ -107,9 +108,6 @@ object FlinkRuleSets {
ProjectToCalcRule.INSTANCE,
CalcMergeRule.INSTANCE,
- // distinct aggregate rule for FLINK-3475
- FlinkAggregateExpandDistinctAggregatesRule.JOIN,
-
// translate to Flink DataSet nodes
DataSetWindowAggregateRule.INSTANCE,
DataSetAggregateRule.INSTANCE,