You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/16 17:39:18 UTC
flink git commit: [FLINK-5624] [table] Add SQL support for tumbling
windows on streaming tables.
Repository: flink
Updated Branches:
refs/heads/master f24514339 -> 8304f3e15
[FLINK-5624] [table] Add SQL support for tumbling windows on streaming tables.
This closes #3252.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8304f3e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8304f3e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8304f3e1
Branch: refs/heads/master
Commit: 8304f3e159851d29691e66cacfcb4278d73a8b97
Parents: f245143
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Feb 14 13:28:44 2017 -0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Feb 16 18:33:33 2017 +0100
----------------------------------------------------------------------
.../functions/TimeModeIndicatorFunctions.scala | 53 +++++++
.../flink/table/plan/rules/FlinkRuleSets.scala | 3 +
.../datastream/LogicalWindowAggregateRule.scala | 139 +++++++++++++++++++
.../flink/table/validate/FunctionCatalog.scala | 11 +-
.../scala/stream/sql/WindowAggregateTest.scala | 111 +++++++++++++++
5 files changed, 314 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
new file mode 100644
index 0000000..7a7e00f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions
+
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeName}
+import org.apache.calcite.sql.validate.SqlMonotonicity
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.LeafExpression
+
+object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
+ ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC,
+ SqlFunctionCategory.SYSTEM) {
+ override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
+
+ override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity =
+ SqlMonotonicity.INCREASING
+}
+
+abstract class TimeIndicator extends LeafExpression {
+ /**
+ * Returns the [[org.apache.flink.api.common.typeinfo.TypeInformation]]
+ * for evaluating this expression.
+ * It is sometimes not available until the expression is valid.
+ */
+ override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
+
+ /**
+ * Convert Expression to its counterpart in Calcite, i.e. RexNode
+ */
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder) =
+ throw new TableException("indicator functions (e.g. proctime() and rowtime()" +
+ " are not executable. Please check your expressions.")
+}
+
+case class RowTime() extends TimeIndicator
http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index a24a06d..f9c8d8d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -125,6 +125,9 @@ object FlinkRuleSets {
* RuleSet to normalize plans for stream / DataStream execution
*/
val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList(
+ // Transform window to LogicalWindowAggregate
+ LogicalWindowAggregateRule.INSTANCE,
+
// simplify expressions rules
ReduceExpressionsRule.FILTER_INSTANCE,
ReduceExpressionsRule.PROJECT_INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
new file mode 100644
index 0000000..094e47b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import java.util.Calendar
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.avatica.util.TimeUnitRange
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
+import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
+import org.apache.calcite.sql.fun.SqlFloorFunction
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.api.scala.Tumble
+import org.apache.flink.table.api.{TableException, TumblingWindow, Window}
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.EventTimeExtractor
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+
+import scala.collection.JavaConversions._
+
+class LogicalWindowAggregateRule
+ extends RelOptRule(
+ LogicalWindowAggregateRule.LOGICAL_WINDOW_PREDICATE,
+ "LogicalWindowAggregateRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+
+ val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+ val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+
+ val windowClause = recognizeWindow(agg)
+ !distinctAggs && !groupSets && !agg.indicator && windowClause.isDefined
+ }
+
+ /**
+ * Transform LogicalAggregate with windowing expression to LogicalProject
+ * + LogicalWindowAggregate + LogicalProject.
+ *
+ * The transformation adds an additional LogicalProject at the top to ensure
+ * that the types are equivalent.
+ */
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val agg = call.rel[LogicalAggregate](0)
+ val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+ val (windowExprIdx, window) = recognizeWindow(agg).get
+ val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx))
+
+ val builder = call.builder()
+ val rexBuilder = builder.getRexBuilder
+ val zero = rexBuilder.makeTimestampLiteral(LogicalWindowAggregateRule.TIMESTAMP_ZERO, 3)
+
+ val newAgg = builder
+ .push(project.getInput)
+ .project(project.getChildExps.updated(windowExprIdx, zero))
+ .aggregate(builder.groupKey(
+ newGroupSet,
+ agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList)
+ .build().asInstanceOf[LogicalAggregate]
+
+ // Create an additional project to conform with types
+ val transformed = call.builder()
+ transformed.push(LogicalWindowAggregate.create(
+ window.toLogicalWindow,
+ Seq[NamedWindowProperty](),
+ newAgg))
+ .project(transformed.fields().patch(windowExprIdx, Seq(zero), 0))
+ call.transformTo(transformed.build())
+ }
+
+ private def recognizeWindow(agg: LogicalAggregate) : Option[(Int, Window)] = {
+ val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+ val key = agg.getGroupSet.asList()
+ val fields = key.flatMap(x => nodeToMaybeWindow(project.getProjects.get(x)) match {
+ case Some(w) => Some(x.toInt, w)
+ case _ => None
+ })
+ fields.size match {
+ case 0 => None
+ case 1 => Some(fields.head)
+ case _ => throw new TableException("Multiple windows are not supported")
+ }
+ }
+
+ private def nodeToMaybeWindow(field: RexNode): Option[Window] = {
+ field match {
+ case call: RexCall =>
+ call.getOperator match {
+ case _: SqlFloorFunction => call.getOperands.get(0) match {
+ case c: RexCall => if (c.getOperator == EventTimeExtractor) {
+ val unit = call.getOperands.get(1)
+ .asInstanceOf[RexLiteral].getValue.asInstanceOf[TimeUnitRange]
+ return Some(LogicalWindowAggregateRule.timeUnitRangeToWindow(unit)
+ .on("rowtime"))
+ }
+ case _ =>
+ }
+ case _ =>
+ }
+ case _ =>
+ }
+ None
+ }
+}
+
+object LogicalWindowAggregateRule {
+ private[flink] val TIMESTAMP_ZERO = Calendar.getInstance()
+ TIMESTAMP_ZERO.setTimeInMillis(0)
+
+ private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
+ RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
+
+ private[flink] val INSTANCE = new LogicalWindowAggregateRule
+
+ private val EXPR_ONE = ExpressionParser.parseExpression("1")
+
+ def timeUnitRangeToWindow(range: TimeUnitRange): TumblingWindow = {
+ Tumble over ExpressionUtils.toMilliInterval(EXPR_ONE, range.startUnit.multiplier.longValue())
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index c00f8bb..207eba1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -23,8 +23,8 @@ import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTabl
import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
import org.apache.flink.table.functions.utils.{TableSqlFunction, UserDefinedFunctionUtils}
+import org.apache.flink.table.functions.{EventTimeExtractor, RowTime, ScalarFunction, TableFunction}
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -190,11 +190,14 @@ object FunctionCatalog {
// array
"cardinality" -> classOf[ArrayCardinality],
"at" -> classOf[ArrayElementAt],
- "element" -> classOf[ArrayElement]
+ "element" -> classOf[ArrayElement],
// TODO implement function overloading here
// "floor" -> classOf[TemporalFloor]
// "ceil" -> classOf[TemporalCeil]
+
+ // extensions to support streaming query
+ "rowtime" -> classOf[RowTime]
)
/**
@@ -317,7 +320,9 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.EXTRACT,
SqlStdOperatorTable.QUARTER,
SqlStdOperatorTable.SCALAR_QUERY,
- SqlStdOperatorTable.EXISTS
+ SqlStdOperatorTable.EXISTS,
+ // EXTENSIONS
+ EventTimeExtractor
)
builtInSqlOperators.foreach(register)
http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
new file mode 100644
index 0000000..183b84c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.EventTimeTumblingGroupWindow
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class WindowAggregateTest extends TableTestBase {
+ private val streamUtil: StreamTableTestUtil = streamTestUtil()
+ streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
+
+ @Test
+ def testNonPartitionedTumbleWindow() = {
+ val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(rowtime() TO HOUR)"
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "1970-01-01 00:00:00 AS $f0")
+ ),
+ term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 3600000.millis)),
+ term("select", "COUNT(*) AS EXPR$0")
+ ),
+ term("select", "EXPR$0")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testPartitionedTumbleWindow1() = {
+ val sql = "SELECT a, COUNT(*) FROM MyTable GROUP BY a, FLOOR(rowtime() TO MINUTE)"
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "1970-01-01 00:00:00 AS $f1")
+ ),
+ term("groupBy", "a"),
+ term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 60000.millis)),
+ term("select", "a", "COUNT(*) AS EXPR$1")
+ ),
+ term("select", "a", "EXPR$1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testPartitionedTumbleWindow2() = {
+ val sql = "SELECT a, SUM(c), b FROM MyTable GROUP BY a, FLOOR(rowtime() TO SECOND), b"
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "1970-01-01 00:00:00 AS $f1, b, c")
+ ),
+ term("groupBy", "a, b"),
+ term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 1000.millis)),
+ term("select", "a", "b", "SUM(c) AS EXPR$1")
+ ),
+ term("select", "a", "EXPR$1", "b")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testMultiWindow() = {
+ val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
+ "FLOOR(rowtime() TO HOUR), FLOOR(rowtime() TO MINUTE)"
+ val expected = ""
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testInvalidWindowExpression() = {
+ val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(localTimestamp TO HOUR)"
+ val expected = ""
+ streamUtil.verifySql(sql, expected)
+ }
+}