You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/28 23:43:06 UTC

flink git commit: [FLINK-5921] [table] Add custom data types for rowtime and proctime.

Repository: flink
Updated Branches:
  refs/heads/master 3086af534 -> 2a1a9c1e3


[FLINK-5921] [table] Add custom data types for rowtime and proctime.

- proctime() and rowtime() are translated to constont zero timestamp.

This closes #3425.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a1a9c1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a1a9c1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a1a9c1e

Branch: refs/heads/master
Commit: 2a1a9c1e31faedb76c990a4f9405837600d770f8
Parents: 3086af5
Author: Fabian Hueske <fh...@apache.org>
Authored: Sun Feb 26 00:28:54 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Mar 1 00:42:19 2017 +0100

----------------------------------------------------------------------
 .../table/codegen/calls/FunctionGenerator.scala | 14 +++-
 .../functions/TimeModeIndicatorFunctions.scala  | 80 +++++++++++++++++++-
 .../datastream/LogicalWindowAggregateRule.scala | 71 +++++++++--------
 .../scala/stream/sql/WindowAggregateTest.scala  |  8 +-
 4 files changed, 135 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2a1a9c1e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index dfc9055..7d55957 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -28,7 +28,9 @@ import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
-import org.apache.flink.table.functions.utils.{TableSqlFunction, ScalarSqlFunction}
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+import org.apache.flink.table.functions.{EventTimeExtractor, ProcTimeExtractor}
+import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction}
 
 import scala.collection.mutable
 
@@ -327,6 +329,15 @@ object FunctionGenerator {
         )
       )
 
+    // generate a constant for time indicator functions.
+    //   this is a temporary solution and will be removed when FLINK-5884 is implemented.
+    case ProcTimeExtractor | EventTimeExtractor =>
+      Some(new CallGenerator {
+        override def generate(codeGenerator: CodeGenerator, operands: Seq[GeneratedExpression]) = {
+          GeneratedExpression("0L", "false", "", SqlTimeTypeInfo.TIMESTAMP)
+        }
+      })
+
     // built-in scalar function
     case _ =>
       sqlFunctions.get((sqlOperator, operandTypes))
@@ -336,6 +347,7 @@ object FunctionGenerator {
           case (x: BasicTypeInfo[_], y: BasicTypeInfo[_]) => y.shouldAutocastTo(x) || x == y
           case _ => false
         }).map(_._2))
+
   }
 
   // ----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2a1a9c1e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
index b9b66ea..3ddcbdc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
@@ -17,8 +17,12 @@
  */
 package org.apache.flink.table.functions
 
+import java.nio.charset.Charset
+import java.util
+
+import org.apache.calcite.rel.`type`._
 import org.apache.calcite.sql._
-import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeName}
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeFamily, SqlTypeName}
 import org.apache.calcite.sql.validate.SqlMonotonicity
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
@@ -26,7 +30,7 @@ import org.apache.flink.table.api.TableException
 import org.apache.flink.table.expressions.LeafExpression
 
 object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
-  ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC,
+  ReturnTypes.explicit(TimeModeTypes.ROWTIME), null, OperandTypes.NILADIC,
   SqlFunctionCategory.SYSTEM) {
   override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
 
@@ -35,7 +39,7 @@ object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
 }
 
 object ProcTimeExtractor extends SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION,
-  ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC,
+  ReturnTypes.explicit(TimeModeTypes.PROCTIME), null, OperandTypes.NILADIC,
   SqlFunctionCategory.SYSTEM) {
   override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
 
@@ -61,3 +65,73 @@ abstract class TimeIndicator extends LeafExpression {
 
 case class RowTime() extends TimeIndicator
 case class ProcTime() extends TimeIndicator
+
+object TimeModeTypes {
+
+  // indicator data type for row time (event time)
+  val ROWTIME = new RowTimeType
+  // indicator data type for processing time
+  val PROCTIME = new ProcTimeType
+
+}
+
+class RowTimeType extends TimeModeType {
+
+  override def toString(): String = "ROWTIME"
+  override def getFullTypeString: String = "ROWTIME_INDICATOR"
+}
+
+class ProcTimeType extends TimeModeType {
+
+  override def toString(): String = "PROCTIME"
+  override def getFullTypeString: String = "PROCTIME_INDICATOR"
+}
+
+abstract class TimeModeType extends RelDataType {
+
+  override def getComparability: RelDataTypeComparability = RelDataTypeComparability.NONE
+
+  override def isStruct: Boolean = false
+
+  override def getFieldList: util.List[RelDataTypeField] = null
+
+  override def getFieldNames: util.List[String] = null
+
+  override def getFieldCount: Int = 0
+
+  override def getStructKind: StructKind = StructKind.NONE
+
+  override def getField(
+     fieldName: String,
+     caseSensitive: Boolean,
+     elideRecord: Boolean): RelDataTypeField = null
+
+  override def isNullable: Boolean = false
+
+  override def getComponentType: RelDataType = null
+
+  override def getKeyType: RelDataType = null
+
+  override def getValueType: RelDataType = null
+
+  override def getCharset: Charset = null
+
+  override def getCollation: SqlCollation = null
+
+  override def getIntervalQualifier: SqlIntervalQualifier = null
+
+  override def getPrecision: Int = -1
+
+  override def getScale: Int = -1
+
+  override def getSqlTypeName: SqlTypeName = SqlTypeName.TIMESTAMP
+
+  override def getSqlIdentifier: SqlIdentifier = null
+
+  override def getFamily: RelDataTypeFamily = SqlTypeFamily.NUMERIC
+
+  override def getPrecedenceList: RelDataTypePrecedenceList = ???
+
+  override def isDynamicStruct: Boolean = false
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a1a9c1e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
index f5eb5f9..37a1b7d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
@@ -17,22 +17,19 @@
  */
 package org.apache.flink.table.plan.rules.datastream
 
-import java.util.Calendar
-
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.avatica.util.TimeUnitRange
 import org.apache.calcite.plan._
 import org.apache.calcite.plan.hep.HepRelVertex
 import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
 import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
-import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlFloorFunction
 import org.apache.calcite.util.ImmutableBitSet
 import org.apache.flink.table.api.scala.Tumble
-import org.apache.flink.table.api.{TableException, TumblingWindow, Window}
+import org.apache.flink.table.api.{EventTimeWindow, TableException, TumblingWindow, Window}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.{EventTimeExtractor, ProcTimeExtractor}
+import org.apache.flink.table.functions.TimeModeTypes
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 
@@ -68,7 +65,18 @@ class LogicalWindowAggregateRule
 
     val builder = call.builder()
     val rexBuilder = builder.getRexBuilder
-    val zero = rexBuilder.makeTimestampLiteral(LogicalWindowAggregateRule.TIMESTAMP_ZERO, 3)
+
+    // build dummy literal with type depending on time semantics
+    val zero = window match {
+      case _: EventTimeWindow =>
+        rexBuilder.makeAbstractCast(
+          TimeModeTypes.ROWTIME,
+          rexBuilder.makeLiteral(0L, TimeModeTypes.ROWTIME, true))
+      case _ =>
+        rexBuilder.makeAbstractCast(
+          TimeModeTypes.PROCTIME,
+          rexBuilder.makeLiteral(0L, TimeModeTypes.PROCTIME, true))
+    }
 
     val newAgg = builder
       .push(project.getInput)
@@ -90,52 +98,58 @@ class LogicalWindowAggregateRule
 
   private def recognizeWindow(agg: LogicalAggregate) : Option[(Int, Window)] = {
     val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
-    val key = agg.getGroupSet.asList()
-    val fields = key.flatMap(x => nodeToMaybeWindow(project.getProjects.get(x)) match {
-      case Some(w) => Some(x.toInt, w)
-      case _ => None
-    })
-    fields.size match {
+    val groupKeys = agg.getGroupSet
+
+    // filter expressions on which is grouped
+    val groupExpr = project.getProjects.zipWithIndex.filter(p => groupKeys.get(p._2))
+
+    // check for window expressions in group expressions
+    val windowExpr = groupExpr
+      .map(g => (g._2, identifyWindow(g._1)) )
+      .filter(_._2.isDefined)
+      .map(g => (g._1, g._2.get) )
+
+    windowExpr.size match {
       case 0 => None
-      case 1 => Some(fields.head)
+      case 1 => Some(windowExpr.head)
       case _ => throw new TableException("Multiple windows are not supported")
     }
   }
 
-  private def nodeToMaybeWindow(field: RexNode): Option[Window] = {
+  private def identifyWindow(field: RexNode): Option[Window] = {
+    // Detects window expressions by pattern matching
+    //   supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
+    //   with time being equal to proctime() or rowtime()
     field match {
       case call: RexCall =>
         call.getOperator match {
           case _: SqlFloorFunction =>
-            val unit: TimeUnitRange = LogicalWindowAggregateRule.getLiteral(call.getOperands.get(1))
+            val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
+            val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange]
             val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
-            return LogicalWindowAggregateRule.decorateTimeIndicator(
-              call.getOperands.get(0).asInstanceOf[RexCall].getOperator, w)
+            call.getType match {
+              case TimeModeTypes.PROCTIME =>
+                return Some(w)
+              case TimeModeTypes.ROWTIME =>
+                return Some(w.on("rowtime"))
+              case _ =>
+            }
           case _ =>
         }
       case _ =>
     }
     None
   }
+
 }
 
 object LogicalWindowAggregateRule {
-  private[flink] val TIMESTAMP_ZERO = Calendar.getInstance()
-  TIMESTAMP_ZERO.setTimeInMillis(0)
 
   private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
     RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
 
   private[flink] val INSTANCE = new LogicalWindowAggregateRule
 
-  private def decorateTimeIndicator(operator: SqlOperator, window: TumblingWindow) = {
-    operator match {
-      case EventTimeExtractor => Some(window.on("rowtime"))
-      case ProcTimeExtractor => Some(window)
-      case _ => None
-    }
-  }
-
   private def timeUnitRangeToTumbleWindow(range: TimeUnitRange): TumblingWindow = {
     intervalToTumbleWindow(range.startUnit.multiplier.longValue())
   }
@@ -144,8 +158,5 @@ object LogicalWindowAggregateRule {
     Tumble over Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)
   }
 
-  private def getLiteral[T](node: RexNode): T = {
-    node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2a1a9c1e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 06088ab..c1d39aa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -40,7 +40,7 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "1970-01-01 00:00:00 AS $f0")
+            term("select", "CAST(1970-01-01 00:00:00) AS $f0")
           ),
           term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 3600000.millis)),
           term("select", "COUNT(*) AS EXPR$0")
@@ -61,7 +61,7 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "1970-01-01 00:00:00 AS $f1")
+            term("select", "a", "CAST(1970-01-01 00:00:00) AS $f1")
           ),
           term("groupBy", "a"),
           term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 60000.millis)),
@@ -83,7 +83,7 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "1970-01-01 00:00:00 AS $f1, b, c")
+            term("select", "a", "CAST(1970-01-01 00:00:00) AS $f1, b, c")
           ),
           term("groupBy", "a, b"),
           term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 1000.millis)),
@@ -105,7 +105,7 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "1970-01-01 00:00:00 AS $f0")
+            term("select", "CAST(1970-01-01 00:00:00) AS $f0")
           ),
           term("window", ProcessingTimeTumblingGroupWindow(None, 3600000.millis)),
           term("select", "COUNT(*) AS EXPR$0")