You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/24 16:31:47 UTC
[2/3] flink git commit: [FLINK-5710] [table] Add proctime() function
to indicate processing time in Stream SQL.
[FLINK-5710] [table] Add proctime() function to indicate processing time in Stream SQL.
This closes #3370.
This closes #3302. // duplicate of #3370
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a755de27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a755de27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a755de27
Branch: refs/heads/master
Commit: a755de27b85fe72be4a6f2063225ddc5c7f69058
Parents: 8c78aba
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Feb 23 13:51:45 2017 -0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Feb 24 17:12:42 2017 +0100
----------------------------------------------------------------------
.../functions/TimeModeIndicatorFunctions.scala | 10 ++++++
.../datastream/LogicalWindowAggregateRule.scala | 38 +++++++++++++-------
.../flink/table/validate/FunctionCatalog.scala | 10 +++---
.../scala/stream/sql/WindowAggregateTest.scala | 23 +++++++++++-
4 files changed, 63 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a755de27/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
index 7a7e00f..b9b66ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
@@ -34,6 +34,15 @@ object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
SqlMonotonicity.INCREASING
}
+object ProcTimeExtractor extends SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION,
+ ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC,
+ SqlFunctionCategory.SYSTEM) {
+ override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
+
+ override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity =
+ SqlMonotonicity.INCREASING
+}
+
abstract class TimeIndicator extends LeafExpression {
/**
* Returns the [[org.apache.flink.api.common.typeinfo.TypeInformation]]
@@ -51,3 +60,4 @@ abstract class TimeIndicator extends LeafExpression {
}
case class RowTime() extends TimeIndicator
+case class ProcTime() extends TimeIndicator
http://git-wip-us.apache.org/repos/asf/flink/blob/a755de27/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
index 094e47b..f5eb5f9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
@@ -25,14 +25,16 @@ import org.apache.calcite.plan._
import org.apache.calcite.plan.hep.HepRelVertex
import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.fun.SqlFloorFunction
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.api.scala.Tumble
import org.apache.flink.table.api.{TableException, TumblingWindow, Window}
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.EventTimeExtractor
+import org.apache.flink.table.functions.{EventTimeExtractor, ProcTimeExtractor}
import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import scala.collection.JavaConversions._
@@ -104,15 +106,11 @@ class LogicalWindowAggregateRule
field match {
case call: RexCall =>
call.getOperator match {
- case _: SqlFloorFunction => call.getOperands.get(0) match {
- case c: RexCall => if (c.getOperator == EventTimeExtractor) {
- val unit = call.getOperands.get(1)
- .asInstanceOf[RexLiteral].getValue.asInstanceOf[TimeUnitRange]
- return Some(LogicalWindowAggregateRule.timeUnitRangeToWindow(unit)
- .on("rowtime"))
- }
- case _ =>
- }
+ case _: SqlFloorFunction =>
+ val unit: TimeUnitRange = LogicalWindowAggregateRule.getLiteral(call.getOperands.get(1))
+ val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
+ return LogicalWindowAggregateRule.decorateTimeIndicator(
+ call.getOperands.get(0).asInstanceOf[RexCall].getOperator, w)
case _ =>
}
case _ =>
@@ -130,10 +128,24 @@ object LogicalWindowAggregateRule {
private[flink] val INSTANCE = new LogicalWindowAggregateRule
- private val EXPR_ONE = ExpressionParser.parseExpression("1")
+ private def decorateTimeIndicator(operator: SqlOperator, window: TumblingWindow) = {
+ operator match {
+ case EventTimeExtractor => Some(window.on("rowtime"))
+ case ProcTimeExtractor => Some(window)
+ case _ => None
+ }
+ }
+
+ private def timeUnitRangeToTumbleWindow(range: TimeUnitRange): TumblingWindow = {
+ intervalToTumbleWindow(range.startUnit.multiplier.longValue())
+ }
+
+ private def intervalToTumbleWindow(size: Long): TumblingWindow = {
+ Tumble over Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)
+ }
- def timeUnitRangeToWindow(range: TimeUnitRange): TumblingWindow = {
- Tumble over ExpressionUtils.toMilliInterval(EXPR_ONE, range.startUnit.multiplier.longValue())
+ private def getLiteral[T](node: RexNode): T = {
+ node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a755de27/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 94237f7..3c89ec4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -23,8 +23,8 @@ import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTabl
import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.{EventTimeExtractor, RowTime, ScalarFunction, TableFunction}
-import org.apache.flink.table.functions.utils.{TableSqlFunction, ScalarSqlFunction}
+import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction}
+import org.apache.flink.table.functions.{EventTimeExtractor, RowTime, ScalarFunction, TableFunction, _}
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -197,7 +197,8 @@ object FunctionCatalog {
// "ceil" -> classOf[TemporalCeil]
// extensions to support streaming query
- "rowtime" -> classOf[RowTime]
+ "rowtime" -> classOf[RowTime],
+ "proctime" -> classOf[ProcTime]
)
/**
@@ -322,7 +323,8 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.SCALAR_QUERY,
SqlStdOperatorTable.EXISTS,
// EXTENSIONS
- EventTimeExtractor
+ EventTimeExtractor,
+ ProcTimeExtractor
)
builtInSqlOperators.foreach(register)
http://git-wip-us.apache.org/repos/asf/flink/blob/a755de27/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 183b84c..06088ab 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.api.scala.stream.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.TableException
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.logical.EventTimeTumblingGroupWindow
+import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeTumblingGroupWindow}
import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
import org.apache.flink.table.utils.TableTestUtil._
import org.junit.Test
@@ -94,6 +94,27 @@ class WindowAggregateTest extends TableTestBase {
streamUtil.verifySql(sql, expected)
}
+ @Test
+ def testProcessingTime() = {
+ val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(proctime() TO HOUR)"
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "1970-01-01 00:00:00 AS $f0")
+ ),
+ term("window", ProcessingTimeTumblingGroupWindow(None, 3600000.millis)),
+ term("select", "COUNT(*) AS EXPR$0")
+ ),
+ term("select", "EXPR$0")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
@Test(expected = classOf[TableException])
def testMultiWindow() = {
val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +