You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/16 17:39:18 UTC

flink git commit: [FLINK-5624] [table] Add SQL support for tumbling windows on streaming tables.

Repository: flink
Updated Branches:
  refs/heads/master f24514339 -> 8304f3e15


[FLINK-5624] [table] Add SQL support for tumbling windows on streaming tables.

This closes #3252.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8304f3e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8304f3e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8304f3e1

Branch: refs/heads/master
Commit: 8304f3e159851d29691e66cacfcb4278d73a8b97
Parents: f245143
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Feb 14 13:28:44 2017 -0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Feb 16 18:33:33 2017 +0100

----------------------------------------------------------------------
 .../functions/TimeModeIndicatorFunctions.scala  |  53 +++++++
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   3 +
 .../datastream/LogicalWindowAggregateRule.scala | 139 +++++++++++++++++++
 .../flink/table/validate/FunctionCatalog.scala  |  11 +-
 .../scala/stream/sql/WindowAggregateTest.scala  | 111 +++++++++++++++
 5 files changed, 314 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
new file mode 100644
index 0000000..7a7e00f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions
+
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeName}
+import org.apache.calcite.sql.validate.SqlMonotonicity
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.LeafExpression
+
+object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
+  ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC,
+  SqlFunctionCategory.SYSTEM) {
+  override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
+
+  override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity =
+    SqlMonotonicity.INCREASING
+}
+
+abstract class TimeIndicator extends LeafExpression {
+  /**
+    * Returns the [[org.apache.flink.api.common.typeinfo.TypeInformation]]
+    * for evaluating this expression.
+    * It is sometimes not available until the expression is valid.
+    */
+  override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
+
+  /**
+    * Convert Expression to its counterpart in Calcite, i.e. RexNode
+    */
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) =
+    throw new TableException("indicator functions (e.g. proctime() and rowtime()" +
+      " are not executable. Please check your expressions.")
+}
+
+case class RowTime() extends TimeIndicator

http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index a24a06d..f9c8d8d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -125,6 +125,9 @@ object FlinkRuleSets {
     * RuleSet to normalize plans for stream / DataStream execution
     */
   val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList(
+    // Transform window to LogicalWindowAggregate
+    LogicalWindowAggregateRule.INSTANCE,
+
     // simplify expressions rules
     ReduceExpressionsRule.FILTER_INSTANCE,
     ReduceExpressionsRule.PROJECT_INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
new file mode 100644
index 0000000..094e47b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import java.util.Calendar
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.avatica.util.TimeUnitRange
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
+import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
+import org.apache.calcite.sql.fun.SqlFloorFunction
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.api.scala.Tumble
+import org.apache.flink.table.api.{TableException, TumblingWindow, Window}
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.EventTimeExtractor
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+
+import scala.collection.JavaConversions._
+
+class LogicalWindowAggregateRule
+  extends RelOptRule(
+    LogicalWindowAggregateRule.LOGICAL_WINDOW_PREDICATE,
+    "LogicalWindowAggregateRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+
+    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+
+    val windowClause = recognizeWindow(agg)
+    !distinctAggs && !groupSets && !agg.indicator && windowClause.isDefined
+  }
+
+  /**
+    * Transform LogicalAggregate with windowing expression to LogicalProject
+    * + LogicalWindowAggregate + LogicalProject.
+    *
+    * The transformation adds an additional LogicalProject at the top to ensure
+    * that the types are equivalent.
+    */
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val agg = call.rel[LogicalAggregate](0)
+    val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+    val (windowExprIdx, window) = recognizeWindow(agg).get
+    val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx))
+
+    val builder = call.builder()
+    val rexBuilder = builder.getRexBuilder
+    val zero = rexBuilder.makeTimestampLiteral(LogicalWindowAggregateRule.TIMESTAMP_ZERO, 3)
+
+    val newAgg = builder
+      .push(project.getInput)
+      .project(project.getChildExps.updated(windowExprIdx, zero))
+      .aggregate(builder.groupKey(
+        newGroupSet,
+        agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList)
+      .build().asInstanceOf[LogicalAggregate]
+
+    // Create an additional project to conform with types
+    val transformed = call.builder()
+    transformed.push(LogicalWindowAggregate.create(
+      window.toLogicalWindow,
+      Seq[NamedWindowProperty](),
+      newAgg))
+      .project(transformed.fields().patch(windowExprIdx, Seq(zero), 0))
+    call.transformTo(transformed.build())
+  }
+
+  private def recognizeWindow(agg: LogicalAggregate) : Option[(Int, Window)] = {
+    val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+    val key = agg.getGroupSet.asList()
+    val fields = key.flatMap(x => nodeToMaybeWindow(project.getProjects.get(x)) match {
+      case Some(w) => Some(x.toInt, w)
+      case _ => None
+    })
+    fields.size match {
+      case 0 => None
+      case 1 => Some(fields.head)
+      case _ => throw new TableException("Multiple windows are not supported")
+    }
+  }
+
+  private def nodeToMaybeWindow(field: RexNode): Option[Window] = {
+    field match {
+      case call: RexCall =>
+        call.getOperator match {
+          case _: SqlFloorFunction => call.getOperands.get(0) match {
+            case c: RexCall => if (c.getOperator == EventTimeExtractor) {
+              val unit = call.getOperands.get(1)
+                .asInstanceOf[RexLiteral].getValue.asInstanceOf[TimeUnitRange]
+              return Some(LogicalWindowAggregateRule.timeUnitRangeToWindow(unit)
+                .on("rowtime"))
+            }
+            case _ =>
+          }
+          case _ =>
+        }
+      case _ =>
+    }
+    None
+  }
+}
+
+object LogicalWindowAggregateRule {
+  private[flink] val TIMESTAMP_ZERO = Calendar.getInstance()
+  TIMESTAMP_ZERO.setTimeInMillis(0)
+
+  private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
+    RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
+
+  private[flink] val INSTANCE = new LogicalWindowAggregateRule
+
+  private val EXPR_ONE = ExpressionParser.parseExpression("1")
+
+  def timeUnitRangeToWindow(range: TimeUnitRange): TumblingWindow = {
+    Tumble over ExpressionUtils.toMilliInterval(EXPR_ONE, range.startUnit.multiplier.longValue())
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index c00f8bb..207eba1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -23,8 +23,8 @@ import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTabl
 import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
 import org.apache.flink.table.functions.utils.{TableSqlFunction, UserDefinedFunctionUtils}
+import org.apache.flink.table.functions.{EventTimeExtractor, RowTime, ScalarFunction, TableFunction}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -190,11 +190,14 @@ object FunctionCatalog {
     // array
     "cardinality" -> classOf[ArrayCardinality],
     "at" -> classOf[ArrayElementAt],
-    "element" -> classOf[ArrayElement]
+    "element" -> classOf[ArrayElement],
 
     // TODO implement function overloading here
     // "floor" -> classOf[TemporalFloor]
     // "ceil" -> classOf[TemporalCeil]
+
+    // extensions to support streaming query
+    "rowtime" -> classOf[RowTime]
   )
 
   /**
@@ -317,7 +320,9 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.EXTRACT,
     SqlStdOperatorTable.QUARTER,
     SqlStdOperatorTable.SCALAR_QUERY,
-    SqlStdOperatorTable.EXISTS
+    SqlStdOperatorTable.EXISTS,
+    // EXTENSIONS
+    EventTimeExtractor
   )
 
   builtInSqlOperators.foreach(register)

http://git-wip-us.apache.org/repos/asf/flink/blob/8304f3e1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
new file mode 100644
index 0000000..183b84c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.EventTimeTumblingGroupWindow
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class WindowAggregateTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
+
+  @Test
+  def testNonPartitionedTumbleWindow() = {
+    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(rowtime() TO HOUR)"
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "1970-01-01 00:00:00 AS $f0")
+          ),
+          term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 3600000.millis)),
+          term("select", "COUNT(*) AS EXPR$0")
+        ),
+        term("select", "EXPR$0")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testPartitionedTumbleWindow1() = {
+    val sql = "SELECT a, COUNT(*) FROM MyTable GROUP BY a, FLOOR(rowtime() TO MINUTE)"
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "1970-01-01 00:00:00 AS $f1")
+          ),
+          term("groupBy", "a"),
+          term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 60000.millis)),
+          term("select", "a", "COUNT(*) AS EXPR$1")
+        ),
+        term("select", "a", "EXPR$1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testPartitionedTumbleWindow2() = {
+    val sql = "SELECT a, SUM(c), b FROM MyTable GROUP BY a, FLOOR(rowtime() TO SECOND), b"
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "1970-01-01 00:00:00 AS $f1, b, c")
+          ),
+          term("groupBy", "a, b"),
+          term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 1000.millis)),
+          term("select", "a", "b", "SUM(c) AS EXPR$1")
+        ),
+        term("select", "a", "EXPR$1", "b")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testMultiWindow() = {
+    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
+      "FLOOR(rowtime() TO HOUR), FLOOR(rowtime() TO MINUTE)"
+    val expected = ""
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidWindowExpression() = {
+    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(localTimestamp TO HOUR)"
+    val expected = ""
+    streamUtil.verifySql(sql, expected)
+  }
+}