You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/28 23:43:06 UTC
flink git commit: [FLINK-5921] [table] Add custom data types for
rowtime and proctime.
Repository: flink
Updated Branches:
refs/heads/master 3086af534 -> 2a1a9c1e3
[FLINK-5921] [table] Add custom data types for rowtime and proctime.
- proctime() and rowtime() are translated to constont zero timestamp.
This closes #3425.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a1a9c1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a1a9c1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a1a9c1e
Branch: refs/heads/master
Commit: 2a1a9c1e31faedb76c990a4f9405837600d770f8
Parents: 3086af5
Author: Fabian Hueske <fh...@apache.org>
Authored: Sun Feb 26 00:28:54 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Mar 1 00:42:19 2017 +0100
----------------------------------------------------------------------
.../table/codegen/calls/FunctionGenerator.scala | 14 +++-
.../functions/TimeModeIndicatorFunctions.scala | 80 +++++++++++++++++++-
.../datastream/LogicalWindowAggregateRule.scala | 71 +++++++++--------
.../scala/stream/sql/WindowAggregateTest.scala | 8 +-
4 files changed, 135 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2a1a9c1e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index dfc9055..7d55957 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -28,7 +28,9 @@ import org.apache.calcite.util.BuiltInMethod
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.GenericTypeInfo
-import org.apache.flink.table.functions.utils.{TableSqlFunction, ScalarSqlFunction}
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+import org.apache.flink.table.functions.{EventTimeExtractor, ProcTimeExtractor}
+import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction}
import scala.collection.mutable
@@ -327,6 +329,15 @@ object FunctionGenerator {
)
)
+ // generate a constant for time indicator functions.
+ // this is a temporary solution and will be removed when FLINK-5884 is implemented.
+ case ProcTimeExtractor | EventTimeExtractor =>
+ Some(new CallGenerator {
+ override def generate(codeGenerator: CodeGenerator, operands: Seq[GeneratedExpression]) = {
+ GeneratedExpression("0L", "false", "", SqlTimeTypeInfo.TIMESTAMP)
+ }
+ })
+
// built-in scalar function
case _ =>
sqlFunctions.get((sqlOperator, operandTypes))
@@ -336,6 +347,7 @@ object FunctionGenerator {
case (x: BasicTypeInfo[_], y: BasicTypeInfo[_]) => y.shouldAutocastTo(x) || x == y
case _ => false
}).map(_._2))
+
}
// ----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2a1a9c1e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
index b9b66ea..3ddcbdc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
@@ -17,8 +17,12 @@
*/
package org.apache.flink.table.functions
+import java.nio.charset.Charset
+import java.util
+
+import org.apache.calcite.rel.`type`._
import org.apache.calcite.sql._
-import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeName}
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeFamily, SqlTypeName}
import org.apache.calcite.sql.validate.SqlMonotonicity
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
@@ -26,7 +30,7 @@ import org.apache.flink.table.api.TableException
import org.apache.flink.table.expressions.LeafExpression
object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
- ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC,
+ ReturnTypes.explicit(TimeModeTypes.ROWTIME), null, OperandTypes.NILADIC,
SqlFunctionCategory.SYSTEM) {
override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
@@ -35,7 +39,7 @@ object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
}
object ProcTimeExtractor extends SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION,
- ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC,
+ ReturnTypes.explicit(TimeModeTypes.PROCTIME), null, OperandTypes.NILADIC,
SqlFunctionCategory.SYSTEM) {
override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
@@ -61,3 +65,73 @@ abstract class TimeIndicator extends LeafExpression {
case class RowTime() extends TimeIndicator
case class ProcTime() extends TimeIndicator
+
+object TimeModeTypes {
+
+ // indicator data type for row time (event time)
+ val ROWTIME = new RowTimeType
+ // indicator data type for processing time
+ val PROCTIME = new ProcTimeType
+
+}
+
+class RowTimeType extends TimeModeType {
+
+ override def toString(): String = "ROWTIME"
+ override def getFullTypeString: String = "ROWTIME_INDICATOR"
+}
+
+class ProcTimeType extends TimeModeType {
+
+ override def toString(): String = "PROCTIME"
+ override def getFullTypeString: String = "PROCTIME_INDICATOR"
+}
+
+abstract class TimeModeType extends RelDataType {
+
+ override def getComparability: RelDataTypeComparability = RelDataTypeComparability.NONE
+
+ override def isStruct: Boolean = false
+
+ override def getFieldList: util.List[RelDataTypeField] = null
+
+ override def getFieldNames: util.List[String] = null
+
+ override def getFieldCount: Int = 0
+
+ override def getStructKind: StructKind = StructKind.NONE
+
+ override def getField(
+ fieldName: String,
+ caseSensitive: Boolean,
+ elideRecord: Boolean): RelDataTypeField = null
+
+ override def isNullable: Boolean = false
+
+ override def getComponentType: RelDataType = null
+
+ override def getKeyType: RelDataType = null
+
+ override def getValueType: RelDataType = null
+
+ override def getCharset: Charset = null
+
+ override def getCollation: SqlCollation = null
+
+ override def getIntervalQualifier: SqlIntervalQualifier = null
+
+ override def getPrecision: Int = -1
+
+ override def getScale: Int = -1
+
+ override def getSqlTypeName: SqlTypeName = SqlTypeName.TIMESTAMP
+
+ override def getSqlIdentifier: SqlIdentifier = null
+
+ override def getFamily: RelDataTypeFamily = SqlTypeFamily.NUMERIC
+
+ override def getPrecedenceList: RelDataTypePrecedenceList = ???
+
+ override def isDynamicStruct: Boolean = false
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a1a9c1e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
index f5eb5f9..37a1b7d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
@@ -17,22 +17,19 @@
*/
package org.apache.flink.table.plan.rules.datastream
-import java.util.Calendar
-
import com.google.common.collect.ImmutableList
import org.apache.calcite.avatica.util.TimeUnitRange
import org.apache.calcite.plan._
import org.apache.calcite.plan.hep.HepRelVertex
import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
-import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.fun.SqlFloorFunction
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.api.scala.Tumble
-import org.apache.flink.table.api.{TableException, TumblingWindow, Window}
+import org.apache.flink.table.api.{EventTimeWindow, TableException, TumblingWindow, Window}
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.{EventTimeExtractor, ProcTimeExtractor}
+import org.apache.flink.table.functions.TimeModeTypes
import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
@@ -68,7 +65,18 @@ class LogicalWindowAggregateRule
val builder = call.builder()
val rexBuilder = builder.getRexBuilder
- val zero = rexBuilder.makeTimestampLiteral(LogicalWindowAggregateRule.TIMESTAMP_ZERO, 3)
+
+ // build dummy literal with type depending on time semantics
+ val zero = window match {
+ case _: EventTimeWindow =>
+ rexBuilder.makeAbstractCast(
+ TimeModeTypes.ROWTIME,
+ rexBuilder.makeLiteral(0L, TimeModeTypes.ROWTIME, true))
+ case _ =>
+ rexBuilder.makeAbstractCast(
+ TimeModeTypes.PROCTIME,
+ rexBuilder.makeLiteral(0L, TimeModeTypes.PROCTIME, true))
+ }
val newAgg = builder
.push(project.getInput)
@@ -90,52 +98,58 @@ class LogicalWindowAggregateRule
private def recognizeWindow(agg: LogicalAggregate) : Option[(Int, Window)] = {
val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
- val key = agg.getGroupSet.asList()
- val fields = key.flatMap(x => nodeToMaybeWindow(project.getProjects.get(x)) match {
- case Some(w) => Some(x.toInt, w)
- case _ => None
- })
- fields.size match {
+ val groupKeys = agg.getGroupSet
+
+ // filter expressions on which is grouped
+ val groupExpr = project.getProjects.zipWithIndex.filter(p => groupKeys.get(p._2))
+
+ // check for window expressions in group expressions
+ val windowExpr = groupExpr
+ .map(g => (g._2, identifyWindow(g._1)) )
+ .filter(_._2.isDefined)
+ .map(g => (g._1, g._2.get) )
+
+ windowExpr.size match {
case 0 => None
- case 1 => Some(fields.head)
+ case 1 => Some(windowExpr.head)
case _ => throw new TableException("Multiple windows are not supported")
}
}
- private def nodeToMaybeWindow(field: RexNode): Option[Window] = {
+ private def identifyWindow(field: RexNode): Option[Window] = {
+ // Detects window expressions by pattern matching
+ // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
+ // with time being equal to proctime() or rowtime()
field match {
case call: RexCall =>
call.getOperator match {
case _: SqlFloorFunction =>
- val unit: TimeUnitRange = LogicalWindowAggregateRule.getLiteral(call.getOperands.get(1))
+ val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
+ val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange]
val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
- return LogicalWindowAggregateRule.decorateTimeIndicator(
- call.getOperands.get(0).asInstanceOf[RexCall].getOperator, w)
+ call.getType match {
+ case TimeModeTypes.PROCTIME =>
+ return Some(w)
+ case TimeModeTypes.ROWTIME =>
+ return Some(w.on("rowtime"))
+ case _ =>
+ }
case _ =>
}
case _ =>
}
None
}
+
}
object LogicalWindowAggregateRule {
- private[flink] val TIMESTAMP_ZERO = Calendar.getInstance()
- TIMESTAMP_ZERO.setTimeInMillis(0)
private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
private[flink] val INSTANCE = new LogicalWindowAggregateRule
- private def decorateTimeIndicator(operator: SqlOperator, window: TumblingWindow) = {
- operator match {
- case EventTimeExtractor => Some(window.on("rowtime"))
- case ProcTimeExtractor => Some(window)
- case _ => None
- }
- }
-
private def timeUnitRangeToTumbleWindow(range: TimeUnitRange): TumblingWindow = {
intervalToTumbleWindow(range.startUnit.multiplier.longValue())
}
@@ -144,8 +158,5 @@ object LogicalWindowAggregateRule {
Tumble over Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)
}
- private def getLiteral[T](node: RexNode): T = {
- node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a1a9c1e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 06088ab..c1d39aa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -40,7 +40,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "1970-01-01 00:00:00 AS $f0")
+ term("select", "CAST(1970-01-01 00:00:00) AS $f0")
),
term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 3600000.millis)),
term("select", "COUNT(*) AS EXPR$0")
@@ -61,7 +61,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "1970-01-01 00:00:00 AS $f1")
+ term("select", "a", "CAST(1970-01-01 00:00:00) AS $f1")
),
term("groupBy", "a"),
term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 60000.millis)),
@@ -83,7 +83,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "1970-01-01 00:00:00 AS $f1, b, c")
+ term("select", "a", "CAST(1970-01-01 00:00:00) AS $f1, b, c")
),
term("groupBy", "a, b"),
term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 1000.millis)),
@@ -105,7 +105,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "1970-01-01 00:00:00 AS $f0")
+ term("select", "CAST(1970-01-01 00:00:00) AS $f0")
),
term("window", ProcessingTimeTumblingGroupWindow(None, 3600000.millis)),
term("select", "COUNT(*) AS EXPR$0")