You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:39 UTC
[05/15] flink git commit: [FLINK-5884] [table] Integrate time
indicators for Table API & SQL
[FLINK-5884] [table] Integrate time indicators for Table API & SQL
This closes #3808.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/495f104b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/495f104b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/495f104b
Branch: refs/heads/master
Commit: 495f104b439096dc7eea5141bfe0de0283c5cc62
Parents: 28ab737
Author: twalthr <tw...@apache.org>
Authored: Thu Mar 2 16:06:55 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat May 6 01:51:31 2017 +0200
----------------------------------------------------------------------
.../flink/table/api/BatchTableEnvironment.scala | 25 +-
.../table/api/StreamTableEnvironment.scala | 79 ++--
.../flink/table/api/TableEnvironment.scala | 101 ++++-
.../flink/table/api/scala/expressionDsl.scala | 18 +-
.../apache/flink/table/api/scala/windows.scala | 2 +-
.../org/apache/flink/table/api/table.scala | 14 +-
.../org/apache/flink/table/api/windows.scala | 357 +++++++--------
.../calcite/FlinkCalciteSqlValidator.scala | 12 +-
.../flink/table/calcite/FlinkPlannerImpl.scala | 6 +-
.../flink/table/calcite/FlinkTypeFactory.scala | 132 +++++-
.../calcite/RelTimeIndicatorConverter.scala | 222 ++++++++++
.../flink/table/codegen/CodeGenerator.scala | 99 +++--
.../table/codegen/calls/FunctionGenerator.scala | 10 -
.../table/expressions/ExpressionUtils.scala | 39 ++
.../apache/flink/table/expressions/call.scala | 16 +-
.../table/expressions/fieldExpression.scala | 48 +-
.../TimeMaterializationSqlFunction.scala | 41 ++
.../functions/TimeModeIndicatorFunctions.scala | 137 ------
.../flink/table/plan/ProjectionTranslator.scala | 31 +-
.../table/plan/logical/LogicalWindow.scala | 14 +-
.../flink/table/plan/logical/groupWindows.scala | 280 ++++--------
.../flink/table/plan/logical/operators.scala | 40 +-
.../flink/table/plan/nodes/CommonCalc.scala | 37 +-
.../table/plan/nodes/CommonCorrelate.scala | 32 +-
.../flink/table/plan/nodes/CommonScan.scala | 24 +-
.../flink/table/plan/nodes/OverAggregate.scala | 35 +-
.../plan/nodes/PhysicalTableSourceScan.scala | 6 +-
.../table/plan/nodes/dataset/BatchScan.scala | 12 +-
.../nodes/dataset/BatchTableSourceScan.scala | 14 +-
.../plan/nodes/dataset/DataSetAggregate.scala | 8 +-
.../table/plan/nodes/dataset/DataSetCalc.scala | 21 +-
.../plan/nodes/dataset/DataSetCorrelate.scala | 9 +-
.../nodes/dataset/DataSetWindowAggregate.scala | 68 +--
.../nodes/datastream/DataStreamAggregate.scala | 178 ++++----
.../plan/nodes/datastream/DataStreamCalc.scala | 22 +-
.../nodes/datastream/DataStreamCorrelate.scala | 22 +-
.../datastream/DataStreamOverAggregate.scala | 112 ++---
.../plan/nodes/datastream/DataStreamRel.scala | 1 -
.../plan/nodes/datastream/DataStreamScan.scala | 10 +-
.../plan/nodes/datastream/DataStreamUnion.scala | 14 +-
.../nodes/datastream/DataStreamValues.scala | 21 +-
.../plan/nodes/datastream/StreamScan.scala | 32 +-
.../datastream/StreamTableSourceScan.scala | 58 ++-
.../nodes/logical/FlinkLogicalOverWindow.scala | 2 +-
.../logical/FlinkLogicalTableSourceScan.scala | 6 +-
.../common/WindowStartEndPropertiesRule.scala | 4 +-
.../datastream/DataStreamAggregateRule.scala | 5 +-
.../rules/datastream/DataStreamCalcRule.scala | 4 +-
.../datastream/DataStreamCorrelateRule.scala | 6 +-
.../DataStreamLogicalWindowAggregateRule.scala | 56 ++-
.../DataStreamOverAggregateRule.scala | 5 +-
.../rules/datastream/DataStreamScanRule.scala | 4 +-
.../rules/datastream/DataStreamUnionRule.scala | 3 +-
.../rules/datastream/DataStreamValuesRule.scala | 3 +-
.../table/plan/schema/DataStreamTable.scala | 14 +
.../flink/table/plan/schema/FlinkTable.scala | 7 +-
.../flink/table/plan/schema/RowSchema.scala | 152 +++++++
.../plan/schema/TimeIndicatorRelDataType.scala | 49 +++
.../apache/flink/table/runtime/MapRunner.scala | 2 +-
.../table/runtime/aggregate/AggregateUtil.scala | 190 ++++----
.../aggregate/RowTimeBoundedRangeOver.scala | 6 +-
.../aggregate/RowTimeBoundedRowsOver.scala | 2 +-
.../table/sources/DefinedTimeAttributes.scala | 47 ++
.../table/typeutils/TimeIndicatorTypeInfo.scala | 45 ++
.../flink/table/typeutils/TypeCheckUtils.scala | 5 +-
.../flink/table/validate/FunctionCatalog.scala | 10 +-
.../api/java/batch/TableEnvironmentITCase.java | 9 -
.../flink/table/TableEnvironmentTest.scala | 55 ++-
.../scala/batch/TableEnvironmentITCase.scala | 10 -
.../scala/batch/sql/WindowAggregateTest.scala | 18 +-
.../scala/batch/table/FieldProjectionTest.scala | 36 +-
.../api/scala/batch/table/GroupWindowTest.scala | 121 ++---
.../table/api/scala/stream/sql/SqlITCase.scala | 150 ++++---
.../scala/stream/sql/WindowAggregateTest.scala | 179 ++++----
.../scala/stream/table/AggregationsITCase.scala | 12 +-
.../api/scala/stream/table/CalcITCase.scala | 16 -
.../scala/stream/table/GroupWindowTest.scala | 440 ++++++-------------
.../scala/stream/table/OverWindowITCase.scala | 12 +-
.../api/scala/stream/table/OverWindowTest.scala | 101 ++---
.../GroupWindowStringExpressionTest.scala | 6 +-
.../OverWindowStringExpressionTest.scala | 16 +-
.../datastream/DataStreamAggregateITCase.scala | 22 +-
82 files changed, 2368 insertions(+), 1921 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 00cf11c..3eb2ffc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -194,7 +194,30 @@ abstract class BatchTableEnvironment(
protected def registerDataSetInternal[T](
name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {
- val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields)
+ val (fieldNames, fieldIndexes) = getFieldInfo[T](
+ dataSet.getType,
+ fields,
+ ignoreTimeAttributes = true)
+
+ // validate and extract time attributes
+ val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)
+
+ // don't allow proctime on batch
+ proctime match {
+ case Some(_) =>
+ throw new ValidationException(
+ "A proctime attribute is not allowed in a batch environment. " +
+ "Working with processing-time on batch would lead to non-deterministic results.")
+ case _ => // ok
+ }
+ // rowtime must not extend the schema of a batch table
+ rowtime match {
+ case Some((idx, _)) if idx >= dataSet.getType.getArity =>
+ throw new ValidationException(
+ "A rowtime attribute must be defined on an existing field in a batch environment.")
+ case _ => // ok
+ }
+
val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
registerTableInternal(name, dataSetTable)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index f532c5b..d1f2fb5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.calcite.RelTimeIndicatorConverter
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -87,47 +88,6 @@ abstract class StreamTableEnvironment(
protected def createUniqueTableName(): String = "_DataStreamTable_" + nameCntr.getAndIncrement()
/**
- * Returns field names and field positions for a given [[TypeInformation]].
- *
- * Field names are automatically extracted for
- * [[org.apache.flink.api.common.typeutils.CompositeType]].
- * The method fails if inputType is not a
- * [[org.apache.flink.api.common.typeutils.CompositeType]].
- *
- * @param inputType The TypeInformation extract the field names and positions from.
- * @tparam A The type of the TypeInformation.
- * @return A tuple of two arrays holding the field names and corresponding field positions.
- */
- override protected[flink] def getFieldInfo[A](inputType: TypeInformation[A])
- : (Array[String], Array[Int]) = {
- val fieldInfo = super.getFieldInfo(inputType)
- if (fieldInfo._1.contains("rowtime")) {
- throw new TableException("'rowtime' ia a reserved field name in stream environment.")
- }
- fieldInfo
- }
-
- /**
- * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
- * [[Expression]].
- *
- * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
- * @param exprs The expressions that define the field names.
- * @tparam A The type of the TypeInformation.
- * @return A tuple of two arrays holding the field names and corresponding field positions.
- */
- override protected[flink] def getFieldInfo[A](
- inputType: TypeInformation[A],
- exprs: Array[Expression])
- : (Array[String], Array[Int]) = {
- val fieldInfo = super.getFieldInfo(inputType, exprs)
- if (fieldInfo._1.contains("rowtime")) {
- throw new TableException("'rowtime' is a reserved field name in stream environment.")
- }
- fieldInfo
- }
-
- /**
* Registers an external [[StreamTableSource]] in this [[TableEnvironment]]'s catalog.
* Registered tables can be referenced in SQL queries.
*
@@ -145,6 +105,7 @@ abstract class StreamTableEnvironment(
"StreamTableEnvironment")
}
}
+
/**
* Writes a [[Table]] to a [[TableSink]].
*
@@ -185,7 +146,9 @@ abstract class StreamTableEnvironment(
val dataStreamTable = new DataStreamTable[T](
dataStream,
fieldIndexes,
- fieldNames
+ fieldNames,
+ None,
+ None
)
registerTableInternal(name, dataStreamTable)
}
@@ -200,15 +163,26 @@ abstract class StreamTableEnvironment(
* @tparam T The type of the [[DataStream]].
*/
protected def registerDataStreamInternal[T](
- name: String,
- dataStream: DataStream[T],
- fields: Array[Expression]): Unit = {
+ name: String,
+ dataStream: DataStream[T],
+ fields: Array[Expression])
+ : Unit = {
+
+ val (fieldNames, fieldIndexes) = getFieldInfo[T](
+ dataStream.getType,
+ fields,
+ ignoreTimeAttributes = false)
+
+ // validate and extract time attributes
+ val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)
+
- val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields)
val dataStreamTable = new DataStreamTable[T](
dataStream,
fieldIndexes,
- fieldNames
+ fieldNames,
+ rowtime,
+ proctime
)
registerTableInternal(name, dataStreamTable)
}
@@ -259,7 +233,10 @@ abstract class StreamTableEnvironment(
// 1. decorrelate
val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
- // 2. normalize the logical plan
+ // 2. convert time indicators
+ val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)
+
+ // 3. normalize the logical plan
val normRuleSet = getNormRuleSet
val normalizedPlan = if (normRuleSet.iterator().hasNext) {
runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, decorPlan, decorPlan.getTraitSet)
@@ -267,7 +244,7 @@ abstract class StreamTableEnvironment(
decorPlan
}
- // 3. optimize the logical Flink plan
+ // 4. optimize the logical Flink plan
val logicalOptRuleSet = getLogicalOptRuleSet
val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
@@ -276,7 +253,7 @@ abstract class StreamTableEnvironment(
normalizedPlan
}
- // 4. optimize the physical Flink plan
+ // 5. optimize the physical Flink plan
val physicalOptRuleSet = getPhysicalOptRuleSet
val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
@@ -285,7 +262,7 @@ abstract class StreamTableEnvironment(
logicalPlan
}
- // 5. decorate the optimized plan
+ // 6. decorate the optimized plan
val decoRuleSet = getDecoRuleSet
val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, physicalPlan, physicalPlan.getTraitSet)
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 06c405e..4c72e8f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -52,6 +52,9 @@ import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.{ScalarFunction, TableFunction, AggregateFunction}
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
import org.apache.flink.table.plan.cost.DataSetCostFactory
import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.table.plan.rules.FlinkRuleSets
@@ -598,70 +601,94 @@ abstract class TableEnvironment(val config: TableConfig) {
/**
* Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
- * [[Expression]].
+ * [[Expression]]. It does not handle time attributes but considers them in indices, if
+ * ignore flag is not false.
*
* @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
* @param exprs The expressions that define the field names.
+ * @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions.
* @tparam A The type of the TypeInformation.
* @return A tuple of two arrays holding the field names and corresponding field positions.
*/
protected[flink] def getFieldInfo[A](
- inputType: TypeInformation[A],
- exprs: Array[Expression]): (Array[String], Array[Int]) = {
+ inputType: TypeInformation[A],
+ exprs: Array[Expression],
+ ignoreTimeAttributes: Boolean)
+ : (Array[String], Array[Int]) = {
TableEnvironment.validateType(inputType)
+ val filteredExprs = if (ignoreTimeAttributes) {
+ exprs.map {
+ case ta: TimeAttribute => ta.expression
+ case e@_ => e
+ }
+ } else {
+ exprs
+ }
+
val indexedNames: Array[(Int, String)] = inputType match {
case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
throw new TableException(
"An input of GenericTypeInfo<Row> cannot be converted to Table. " +
"Please specify the type of the input with a RowTypeInfo.")
case a: AtomicType[A] =>
- if (exprs.length != 1) {
- throw new TableException("Table of atomic type can only have a single field.")
- }
- exprs.map {
- case UnresolvedFieldReference(name) => (0, name)
+ filteredExprs.zipWithIndex flatMap {
+ case (UnresolvedFieldReference(name), idx) =>
+ if (idx > 0) {
+ throw new TableException("Table of atomic type can only have a single field.")
+ }
+ Some((0, name))
+ case (_: TimeAttribute, _) if ignoreTimeAttributes =>
+ None
case _ => throw new TableException("Field reference expression requested.")
}
case t: TupleTypeInfo[A] =>
- exprs.zipWithIndex.map {
- case (UnresolvedFieldReference(name), idx) => (idx, name)
+ filteredExprs.zipWithIndex flatMap {
+ case (UnresolvedFieldReference(name), idx) =>
+ Some((idx, name))
case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
val idx = t.getFieldIndex(origName)
if (idx < 0) {
throw new TableException(s"$origName is not a field of type $t")
}
- (idx, name)
+ Some((idx, name))
+ case (_: TimeAttribute, _) =>
+ None
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
}
case c: CaseClassTypeInfo[A] =>
- exprs.zipWithIndex.map {
- case (UnresolvedFieldReference(name), idx) => (idx, name)
+ filteredExprs.zipWithIndex flatMap {
+ case (UnresolvedFieldReference(name), idx) =>
+ Some((idx, name))
case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
val idx = c.getFieldIndex(origName)
if (idx < 0) {
throw new TableException(s"$origName is not a field of type $c")
}
- (idx, name)
+ Some((idx, name))
+ case (_: TimeAttribute, _) =>
+ None
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
}
case p: PojoTypeInfo[A] =>
- exprs.map {
+ filteredExprs flatMap {
case (UnresolvedFieldReference(name)) =>
val idx = p.getFieldIndex(name)
if (idx < 0) {
throw new TableException(s"$name is not a field of type $p")
}
- (idx, name)
+ Some((idx, name))
case Alias(UnresolvedFieldReference(origName), name, _) =>
val idx = p.getFieldIndex(origName)
if (idx < 0) {
throw new TableException(s"$origName is not a field of type $p")
}
- (idx, name)
+ Some((idx, name))
+ case _: TimeAttribute =>
+ None
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
}
@@ -795,6 +822,42 @@ abstract class TableEnvironment(val config: TableConfig) {
Some(mapFunction)
}
+ /**
+ * Checks for at most one rowtime and proctime attribute.
+ * Returns the time attributes.
+ *
+ * @return rowtime attribute and proctime attribute
+ */
+ protected def validateAndExtractTimeAttributes(
+ fieldNames: Seq[String],
+ fieldIndices: Seq[Int],
+ exprs: Array[Expression])
+ : (Option[(Int, String)], Option[(Int, String)]) = {
+
+ var rowtime: Option[(Int, String)] = None
+ var proctime: Option[(Int, String)] = None
+
+ exprs.zipWithIndex.foreach {
+ case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
+ if (rowtime.isDefined) {
+ throw new TableException(
+ "The rowtime attribute can only be defined once in a table schema.")
+ } else {
+ rowtime = Some(idx, name)
+ }
+ case (ProctimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
+ if (proctime.isDefined) {
+ throw new TableException(
+ "The proctime attribute can only be defined once in a table schema.")
+ } else {
+ proctime = Some(idx, name)
+ }
+ case _ =>
+ // do nothing
+ }
+
+ (rowtime, proctime)
+ }
}
/**
@@ -803,6 +866,10 @@ abstract class TableEnvironment(val config: TableConfig) {
*/
object TableEnvironment {
+ // default names that can be used in in TableSources etc.
+ val DEFAULT_ROWTIME_ATTRIBUTE = "rowtime"
+ val DEFAULT_PROCTIME_ATTRIBUTE = "proctime"
+
/**
* Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]].
*
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index cc58ff5..6d15212 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -625,7 +625,7 @@ trait ImplicitExpressionOperations {
*/
def millis = milli
- // row interval type
+ // Row interval type
/**
* Creates an interval of rows.
@@ -634,6 +634,8 @@ trait ImplicitExpressionOperations {
*/
def rows = toRowInterval(expr)
+ // Advanced type helper functions
+
/**
* Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and
* returns it's value.
@@ -680,6 +682,20 @@ trait ImplicitExpressionOperations {
* @return the first and only element of an array with a single element
*/
def element() = ArrayElement(expr)
+
+ // Schema definition
+
+ /**
+ * Declares a field as the rowtime attribute for indicating, accessing, and working in
+ * Flink's event time.
+ */
+ def rowtime = RowtimeAttribute(expr)
+
+ /**
+ * Declares a field as the proctime attribute for indicating, accessing, and working in
+ * Flink's processing time.
+ */
+ def proctime = ProctimeAttribute(expr)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
index 5e70440..d0430c2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
@@ -18,8 +18,8 @@
package org.apache.flink.table.api.scala
+import org.apache.flink.table.api.{OverWindowWithOrderBy, SessionWithGap, SlideWithSize, TumbleWithSize}
import org.apache.flink.table.expressions.Expression
-import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithOrderBy, SlideWithSize, SessionWithGap}
/**
* Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 87dde0a..dd8265b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -20,12 +20,12 @@ package org.apache.flink.table.api
import org.apache.calcite.rel.RelNode
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference}
-import org.apache.flink.table.plan.logical.Minus
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.plan.ProjectionTranslator._
-import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.logical.{Minus, _}
import org.apache.flink.table.sinks.TableSink
import _root_.scala.collection.JavaConverters._
@@ -1015,13 +1015,7 @@ class WindowGroupedTable(
val projectsOnAgg = replaceAggregationsAndProperties(
fields, table.tableEnv, aggNames, propNames)
- val projectFields = (table.tableEnv, window) match {
- // event time can be arbitrary field in batch environment
- case (_: BatchTableEnvironment, w: EventTimeWindow) =>
- extractFieldReferences(fields ++ groupKeys ++ Seq(w.timeField))
- case (_, _) =>
- extractFieldReferences(fields ++ groupKeys)
- }
+ val projectFields = extractFieldReferences(fields ++ groupKeys :+ window.timeField)
new Table(table.tableEnv,
Project(
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
index 80260f7..11ef360 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
@@ -149,7 +149,7 @@ class OverWindowWithOrderBy(
* A window specification.
*
* Window groups rows based on time or row-count intervals. It is a general way to group the
- * elements, which is very helpful for both groupby-aggregations and over-aggregations to
+ * elements, which is very helpful for both groupBy-aggregations and over-aggregations to
* compute aggregates on groups of elements.
*
* Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping
@@ -157,111 +157,73 @@ class OverWindowWithOrderBy(
*
* For finite batch tables, window provides shortcuts for time-based groupBy.
*
- * @param alias The expression of alias for this Window
*/
-abstract class Window(val alias: Expression) {
+abstract class Window(val alias: Expression, val timeField: Expression) {
/**
* Converts an API class to a logical window for planning.
*/
private[flink] def toLogicalWindow: LogicalWindow
+
}
+// ------------------------------------------------------------------------------------------------
+// Tumbling windows
+// ------------------------------------------------------------------------------------------------
+
/**
- * A window specification without alias.
+ * Tumbling window.
+ *
+ * For streaming tables you can specify grouping by a event-time or processing-time attribute.
+ *
+ * For batch tables you can specify grouping on a timestamp or long attribute.
+ *
+ * @param size the size of the window either as time or row-count interval.
*/
-abstract class WindowWithoutAlias {
+class TumbleWithSize(size: Expression) {
/**
- * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
- * refer to. `select()` statement can access window properties such as window start or end time.
+ * Tumbling window.
*
- * @param alias alias for this window
- * @return this window
- */
- def as(alias: Expression): Window
-
- /**
- * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
- * refer to. `select()` statement can access window properties such as window start or end time.
+ * For streaming tables you can specify grouping by a event-time or processing-time attribute.
*
- * @param alias alias for this window
- * @return this window
+ * For batch tables you can specify grouping on a timestamp or long attribute.
+ *
+ * @param size the size of the window either as time or row-count interval.
*/
- def as(alias: String): Window = as(ExpressionParser.parseExpression(alias))
-}
-
-/**
- * A predefined specification of window on processing-time
- */
-abstract class ProcTimeWindowWithoutAlias extends WindowWithoutAlias {
+ def this(size: String) = this(ExpressionParser.parseExpression(size))
/**
* Specifies the time attribute on which rows are grouped.
*
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
- * are grouped by processing-time.
+ * For streaming tables you can specify grouping by a event-time or processing-time attribute.
*
- * For batch tables, refer to a timestamp or long attribute.
+ * For batch tables you can specify grouping on a timestamp or long attribute.
*
- * @param timeField time mode for streaming tables and time attribute for batch tables
- * @return a predefined window on event-time
+ * @param timeField time attribute for streaming and batch tables
+ * @return a tumbling window on event-time
*/
- def on(timeField: Expression): WindowWithoutAlias
+ def on(timeField: Expression): TumbleWithSizeOnTime =
+ new TumbleWithSizeOnTime(timeField, size)
/**
* Specifies the time attribute on which rows are grouped.
*
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
- * are grouped by processing-time.
+ * For streaming tables you can specify grouping by a event-time or processing-time attribute.
*
- * For batch tables, refer to a timestamp or long attribute.
+ * For batch tables you can specify grouping on a timestamp or long attribute.
*
- * @param timeField time mode for streaming tables and time attribute for batch tables
- * @return a predefined window on event-time
+ * @param timeField time attribute for streaming and batch tables
+ * @return a tumbling window on event-time
*/
- def on(timeField: String): WindowWithoutAlias =
+ def on(timeField: String): TumbleWithSizeOnTime =
on(ExpressionParser.parseExpression(timeField))
}
/**
- * A window operating on event-time.
- *
- * For streaming tables call on('rowtime) to specify grouping by event-time.
- * Otherwise rows are grouped by processing-time.
- *
- * For batch tables, refer to a timestamp or long attribute.
- *
- * @param timeField time mode for streaming tables and time attribute for batch tables
+ * Tumbling window on time.
*/
-abstract class EventTimeWindow(alias: Expression, val timeField: Expression) extends Window(alias)
-
-// ------------------------------------------------------------------------------------------------
-// Tumbling windows
-// ------------------------------------------------------------------------------------------------
-
-/**
- * A partial specification of a tumbling window.
- *
- * @param size the size of the window either a time or a row-count interval.
- */
-class TumbleWithSize(size: Expression) extends ProcTimeWindowWithoutAlias {
-
- def this(size: String) = this(ExpressionParser.parseExpression(size))
-
- /**
- * Specifies the time attribute on which rows are grouped.
- *
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time.
- * Otherwise rows are grouped by processing-time.
- *
- * For batch tables, refer to a timestamp or long attribute.
- *
- * @param timeField time mode for streaming tables and time attribute for batch tables
- * @return a predefined window on event-time
- */
- override def on(timeField: Expression): WindowWithoutAlias =
- new TumbleWithoutAlias(timeField, size)
+class TumbleWithSizeOnTime(time: Expression, size: Expression) {
/**
* Assigns an alias for this window that the following `groupBy()` and `select()` clause can
@@ -270,15 +232,9 @@ class TumbleWithSize(size: Expression) extends ProcTimeWindowWithoutAlias {
* @param alias alias for this window
* @return this window
*/
- override def as(alias: Expression) = new TumblingWindow(alias, size)
-}
-
-/**
- * A tumbling window on event-time without alias.
- */
-class TumbleWithoutAlias(
- time: Expression,
- size: Expression) extends WindowWithoutAlias {
+ def as(alias: Expression): TumbleWithSizeOnTimeWithAlias = {
+ new TumbleWithSizeOnTimeWithAlias(alias, time, size)
+ }
/**
* Assigns an alias for this window that the following `groupBy()` and `select()` clause can
@@ -287,31 +243,28 @@ class TumbleWithoutAlias(
* @param alias alias for this window
* @return this window
*/
- override def as(alias: Expression): Window = new TumblingEventTimeWindow(alias, time, size)
-}
-
-/**
- * Tumbling window on processing-time.
- *
- * @param alias the alias of the window.
- * @param size the size of the window either a time or a row-count interval.
- */
-class TumblingWindow(alias: Expression, size: Expression) extends Window(alias) {
-
- override private[flink] def toLogicalWindow: LogicalWindow =
- ProcessingTimeTumblingGroupWindow(alias, size)
+ def as(alias: String): TumbleWithSizeOnTimeWithAlias = {
+ as(ExpressionParser.parseExpression(alias))
+ }
}
/**
- * Tumbling window on event-time.
+ * Tumbling window on time with alias. Fully specifies a window.
*/
-class TumblingEventTimeWindow(
+class TumbleWithSizeOnTimeWithAlias(
alias: Expression,
- time: Expression,
- size: Expression) extends EventTimeWindow(alias, time) {
+ timeField: Expression,
+ size: Expression)
+ extends Window(
+ alias,
+ timeField) {
- override private[flink] def toLogicalWindow: LogicalWindow =
- EventTimeTumblingGroupWindow(alias, time, size)
+ /**
+ * Converts an API class to a logical window for planning.
+ */
+ override private[flink] def toLogicalWindow: LogicalWindow = {
+ TumblingGroupWindow(alias, timeField, size)
+ }
}
// ------------------------------------------------------------------------------------------------
@@ -319,16 +272,16 @@ class TumblingEventTimeWindow(
// ------------------------------------------------------------------------------------------------
/**
- * A partially specified sliding window.
+ * Partially specified sliding window.
*
- * @param size the size of the window either a time or a row-count interval.
+ * @param size the size of the window either as time or row-count interval.
*/
class SlideWithSize(size: Expression) {
/**
- * A partially specified sliding window.
+ * Partially specified sliding window.
*
- * @param size the size of the window either a time or a row-count interval.
+ * @param size the size of the window either as time or row-count interval.
*/
def this(size: String) = this(ExpressionParser.parseExpression(size))
@@ -343,9 +296,9 @@ class SlideWithSize(size: Expression) {
* windows.
*
* @param slide the slide of the window either as time or row-count interval.
- * @return a predefined sliding window.
+ * @return a sliding window
*/
- def every(slide: Expression): SlideWithSlide = new SlideWithSlide(size, slide)
+ def every(slide: Expression): SlideWithSizeAndSlide = new SlideWithSizeAndSlide(size, slide)
/**
* Specifies the window's slide as time or row-count interval.
@@ -358,48 +311,54 @@ class SlideWithSize(size: Expression) {
* windows.
*
* @param slide the slide of the window either as time or row-count interval.
- * @return a predefined sliding window.
+ * @return a sliding window
*/
- def every(slide: String): WindowWithoutAlias = every(ExpressionParser.parseExpression(slide))
+ def every(slide: String): SlideWithSizeAndSlide = every(ExpressionParser.parseExpression(slide))
}
/**
- * A partially defined sliding window.
+ * Sliding window.
+ *
+ * For streaming tables you can specify grouping by a event-time or processing-time attribute.
+ *
+ * For batch tables you can specify grouping on a timestamp or long attribute.
+ *
+ * @param size the size of the window either as time or row-count interval.
*/
-class SlideWithSlide(
- size: Expression,
- slide: Expression) extends ProcTimeWindowWithoutAlias {
+class SlideWithSizeAndSlide(size: Expression, slide: Expression) {
+
/**
* Specifies the time attribute on which rows are grouped.
*
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
- * are grouped by processing-time.
+ * For streaming tables you can specify grouping by a event-time or processing-time attribute.
*
- * For batch tables, refer to a timestamp or long attribute.
+ * For batch tables you can specify grouping on a timestamp or long attribute.
*
- * @param timeField time mode for streaming tables and time attribute for batch tables
- * @return a predefined Sliding window on event-time.
+ * @param timeField time attribute for streaming and batch tables
+ * @return a tumbling window on event-time
*/
- override def on(timeField: Expression): SlideWithoutAlias =
- new SlideWithoutAlias(timeField, size, slide)
+ def on(timeField: Expression): SlideWithSizeAndSlideOnTime =
+ new SlideWithSizeAndSlideOnTime(timeField, size, slide)
/**
- * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
- * refer to. `select()` statement can access window properties such as window start or end time.
+ * Specifies the time attribute on which rows are grouped.
*
- * @param alias alias for this window
- * @return this window
+ * For streaming tables you can specify grouping by a event-time or processing-time attribute.
+ *
+ * For batch tables you can specify grouping on a timestamp or long attribute.
+ *
+ * @param timeField time attribute for streaming and batch tables
+ * @return a tumbling window on event-time
*/
- override def as(alias: Expression): Window = new SlidingWindow(alias, size, slide)
+ def on(timeField: String): SlideWithSizeAndSlideOnTime =
+ on(ExpressionParser.parseExpression(timeField))
}
/**
- * A partially defined sliding window on event-time without alias.
+ * Sliding window on time.
*/
-class SlideWithoutAlias(
- timeField: Expression,
- size: Expression,
- slide: Expression) extends WindowWithoutAlias {
+class SlideWithSizeAndSlideOnTime(timeField: Expression, size: Expression, slide: Expression) {
+
/**
* Assigns an alias for this window that the following `groupBy()` and `select()` clause can
* refer to. `select()` statement can access window properties such as window start or end time.
@@ -407,39 +366,40 @@ class SlideWithoutAlias(
* @param alias alias for this window
* @return this window
*/
- override def as(alias: Expression): Window =
- new SlidingEventTimeWindow(alias, timeField, size, slide)
-}
+ def as(alias: Expression): SlideWithSizeAndSlideOnTimeWithAlias = {
+ new SlideWithSizeAndSlideOnTimeWithAlias(alias, timeField, size, slide)
+ }
-/**
- * A sliding window on processing-time.
- *
- * @param alias the alias of the window.
- * @param size the size of the window either a time or a row-count interval.
- * @param slide the interval by which the window slides.
- */
-class SlidingWindow(
- alias: Expression,
- size: Expression,
- slide: Expression)
- extends Window(alias) {
-
- override private[flink] def toLogicalWindow: LogicalWindow =
- ProcessingTimeSlidingGroupWindow(alias, size, slide)
+ /**
+ * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
+ * refer to. `select()` statement can access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: String): SlideWithSizeAndSlideOnTimeWithAlias = {
+ as(ExpressionParser.parseExpression(alias))
+ }
}
/**
- * A sliding window on event-time.
+ * Sliding window on time with alias. Fully specifies a window.
*/
-class SlidingEventTimeWindow(
+class SlideWithSizeAndSlideOnTimeWithAlias(
alias: Expression,
timeField: Expression,
size: Expression,
slide: Expression)
- extends EventTimeWindow(alias, timeField) {
+ extends Window(
+ alias,
+ timeField) {
- override private[flink] def toLogicalWindow: LogicalWindow =
- EventTimeSlidingGroupWindow(alias, timeField, size, slide)
+ /**
+ * Converts an API class to a logical window for planning.
+ */
+ override private[flink] def toLogicalWindow: LogicalWindow = {
+ SlidingGroupWindow(alias, timeField, size, slide)
+ }
}
// ------------------------------------------------------------------------------------------------
@@ -447,42 +407,59 @@ class SlidingEventTimeWindow(
// ------------------------------------------------------------------------------------------------
/**
- * A partially defined session window.
+ * Session window.
+ *
+ * For streaming tables you can specify grouping by a event-time or processing-time attribute.
+ *
+ * For batch tables you can specify grouping on a timestamp or long attribute.
+ *
+ * @param gap the time interval of inactivity before a window is closed.
*/
-class SessionWithGap(gap: Expression) extends ProcTimeWindowWithoutAlias {
+class SessionWithGap(gap: Expression) {
+ /**
+ * Session window.
+ *
+ * For streaming tables you can specify grouping by a event-time or processing-time attribute.
+ *
+ * For batch tables you can specify grouping on a timestamp or long attribute.
+ *
+ * @param gap the time interval of inactivity before a window is closed.
+ */
def this(gap: String) = this(ExpressionParser.parseExpression(gap))
/**
* Specifies the time attribute on which rows are grouped.
*
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
- * are grouped by processing-time.
+ * For streaming tables you can specify grouping by a event-time or processing-time attribute.
*
- * For batch tables, refer to a timestamp or long attribute.
+ * For batch tables you can specify grouping on a timestamp or long attribute.
*
- * @param timeField time mode for streaming tables and time attribute for batch tables
- * @return an on event-time session window on event-time
+ * @param timeField time attribute for streaming and batch tables
+ * @return a tumbling window on event-time
*/
- override def on(timeField: Expression): SessionWithoutAlias =
- new SessionWithoutAlias(timeField, gap)
+ def on(timeField: Expression): SessionWithGapOnTime =
+ new SessionWithGapOnTime(timeField, gap)
/**
- * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
- * refer to. `select()` statement can access window properties such as window start or end time.
+ * Specifies the time attribute on which rows are grouped.
*
- * @param alias alias for this window
- * @return this window
+ * For streaming tables you can specify grouping by a event-time or processing-time attribute.
+ *
+ * For batch tables you can specify grouping on a timestamp or long attribute.
+ *
+ * @param timeField time attribute for streaming and batch tables
+ * @return a tumbling window on event-time
*/
- override def as(alias: Expression): Window = new SessionWindow(alias, gap)
+ def on(timeField: String): SessionWithGapOnTime =
+ on(ExpressionParser.parseExpression(timeField))
}
/**
- * A partially defined session window on event-time without alias.
+ * Session window on time.
*/
-class SessionWithoutAlias(
- timeField: Expression,
- gap: Expression) extends WindowWithoutAlias {
+class SessionWithGapOnTime(timeField: Expression, gap: Expression) {
+
/**
* Assigns an alias for this window that the following `groupBy()` and `select()` clause can
* refer to. `select()` statement can access window properties such as window start or end time.
@@ -490,29 +467,37 @@ class SessionWithoutAlias(
* @param alias alias for this window
* @return this window
*/
- override def as(alias: Expression): Window = new SessionEventTimeWindow(alias, timeField, gap)
-}
-
-/**
- * A session window on processing-time.
- *
- * @param gap the time interval of inactivity before a window is closed.
- */
-class SessionWindow(alias: Expression, gap: Expression) extends Window(alias) {
+ def as(alias: Expression): SessionWithGapOnTimeWithAlias = {
+ new SessionWithGapOnTimeWithAlias(alias, timeField, gap)
+ }
- override private[flink] def toLogicalWindow: LogicalWindow =
- ProcessingTimeSessionGroupWindow(alias, gap)
+ /**
+ * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
+ * refer to. `select()` statement can access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: String): SessionWithGapOnTimeWithAlias = {
+ as(ExpressionParser.parseExpression(alias))
+ }
}
/**
- * A session window on event-time.
+ * Session window on time with alias. Fully specifies a window.
*/
-class SessionEventTimeWindow(
+class SessionWithGapOnTimeWithAlias(
alias: Expression,
timeField: Expression,
gap: Expression)
- extends EventTimeWindow(alias, timeField) {
+ extends Window(
+ alias,
+ timeField) {
- override private[flink] def toLogicalWindow: LogicalWindow =
- EventTimeSessionGroupWindow(alias, timeField, gap)
+ /**
+ * Converts an API class to a logical window for planning.
+ */
+ override private[flink] def toLogicalWindow: LogicalWindow = {
+ SessionGroupWindow(alias, timeField, gap)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
index b4a3c42..2bdf360 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
@@ -21,8 +21,8 @@ package org.apache.flink.table.calcite
import org.apache.calcite.adapter.java.JavaTypeFactory
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.validate.{SqlConformance, SqlValidatorImpl}
-import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable}
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidatorImpl}
/**
* This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]].
@@ -30,8 +30,12 @@ import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable}
class FlinkCalciteSqlValidator(
opTab: SqlOperatorTable,
catalogReader: CalciteCatalogReader,
- typeFactory: JavaTypeFactory) extends SqlValidatorImpl(
- opTab, catalogReader, typeFactory, SqlConformance.DEFAULT) {
+ factory: JavaTypeFactory)
+ extends SqlValidatorImpl(
+ opTab,
+ catalogReader,
+ factory,
+ SqlConformanceEnum.DEFAULT) {
override def getLogicalSourceRowType(
sourceRowType: RelDataType,
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index 09e3277..beb2436 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -107,7 +107,11 @@ class FlinkPlannerImpl(
// we disable automatic flattening in order to let composite types pass without modification
// we might enable it again once Calcite has better support for structured types
// root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
- root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+
+ // TableEnvironment.optimize will execute the following
+ // root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+ // convert time indicators
+ // root = root.withRel(RelTimeIndicatorConverter.convert(root.rel, rexBuilder))
root
} catch {
case e: RelConversionException => throw TableException(e.getMessage)
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 7762ff8..001011b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -20,25 +20,25 @@ package org.apache.flink.table.calcite
import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.jdbc.JavaTypeFactoryImpl
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
+import org.apache.calcite.rel.`type`._
import org.apache.calcite.sql.SqlIntervalQualifier
-import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
import org.apache.flink.api.java.typeutils.ValueTypeInfo._
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
import org.apache.flink.table.api.TableException
-import org.apache.flink.table.plan.schema.{ArrayRelDataType, CompositeRelDataType, GenericRelDataType, MapRelDataType}
-import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
+import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TimeIntervalTypeInfo}
import org.apache.flink.types.Row
-import scala.collection.mutable
import scala.collection.JavaConverters._
+import scala.collection.mutable
/**
* Flink specific type factory that represents the interface between Flink's [[TypeInformation]]
@@ -65,6 +65,12 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
createSqlIntervalType(
new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
+ case TimeIndicatorTypeInfo.ROWTIME_INDICATOR =>
+ createRowtimeIndicatorType()
+
+ case TimeIndicatorTypeInfo.PROCTIME_INDICATOR =>
+ createProctimeIndicatorType()
+
case _ =>
createSqlType(sqlType)
}
@@ -77,23 +83,75 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
}
/**
+ * Creates a indicator type for processing-time, but with similar properties as SQL timestamp.
+ */
+ def createProctimeIndicatorType(): RelDataType = {
+ val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ canonize(
+ new TimeIndicatorRelDataType(
+ getTypeSystem,
+ originalType.asInstanceOf[BasicSqlType],
+ isEventTime = false)
+ )
+ }
+
+ /**
+ * Creates a indicator type for event-time, but with similar properties as SQL timestamp.
+ */
+ def createRowtimeIndicatorType(): RelDataType = {
+ val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ canonize(
+ new TimeIndicatorRelDataType(
+ getTypeSystem,
+ originalType.asInstanceOf[BasicSqlType],
+ isEventTime = true)
+ )
+ }
+
+ /**
* Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
*
* @param fieldNames field names
* @param fieldTypes field types, every element is Flink's [[TypeInformation]]
- * @return a struct type with the input fieldNames and input fieldTypes
+ * @param rowtime optional system field to indicate event-time; the index determines the index
+ * in the final record and might replace an existing field
+ * @param proctime optional system field to indicate processing-time; the index determines the
+ * index in the final record and might replace an existing field
+ * @return a struct type with the input fieldNames, input fieldTypes, and system fields
*/
- def buildRowDataType(
- fieldNames: Array[String],
- fieldTypes: Array[TypeInformation[_]])
+ def buildLogicalRowType(
+ fieldNames: Seq[String],
+ fieldTypes: Seq[TypeInformation[_]],
+ rowtime: Option[(Int, String)],
+ proctime: Option[(Int, String)])
: RelDataType = {
- val rowDataTypeBuilder = builder
- fieldNames
- .zip(fieldTypes)
- .foreach { f =>
- rowDataTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2)).nullable(true)
+ val logicalRowTypeBuilder = builder
+
+ val fields = fieldNames.zip(fieldTypes)
+
+ var totalNumberOfFields = fields.length
+ if (rowtime.isDefined) {
+ totalNumberOfFields += 1
+ }
+ if (proctime.isDefined) {
+ totalNumberOfFields += 1
+ }
+
+ var addedTimeAttributes = 0
+ for (i <- 0 until totalNumberOfFields) {
+ if (rowtime.isDefined && rowtime.get._1 == i) {
+ logicalRowTypeBuilder.add(rowtime.get._2, createRowtimeIndicatorType())
+ addedTimeAttributes += 1
+ } else if (proctime.isDefined && proctime.get._1 == i) {
+ logicalRowTypeBuilder.add(proctime.get._2, createProctimeIndicatorType())
+ addedTimeAttributes += 1
+ } else {
+ val field = fields(i - addedTimeAttributes)
+ logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2)).nullable(true)
}
- rowDataTypeBuilder.build
+ }
+
+ logicalRowTypeBuilder.build
}
override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
@@ -178,6 +236,7 @@ object FlinkTypeFactory {
/**
* Converts a Calcite logical record into a Flink type information.
*/
+ @deprecated("Use the RowSchema class instead because it handles both logical and physical rows.")
def toInternalRowTypeInfo(logicalRowType: RelDataType): TypeInformation[Row] = {
// convert to type information
val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType =>
@@ -188,6 +247,36 @@ object FlinkTypeFactory {
new RowTypeInfo(logicalFieldTypes.toArray, logicalFieldNames.toArray)
}
+ def isProctimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match {
+ case ti: TimeIndicatorRelDataType if !ti.isEventTime => true
+ case _ => false
+ }
+
+ def isProctimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+ case ti: TimeIndicatorTypeInfo if !ti.isEventTime => true
+ case _ => false
+ }
+
+ def isRowtimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match {
+ case ti: TimeIndicatorRelDataType if ti.isEventTime => true
+ case _ => false
+ }
+
+ def isRowtimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+ case ti: TimeIndicatorTypeInfo if ti.isEventTime => true
+ case _ => false
+ }
+
+ def isTimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match {
+ case ti: TimeIndicatorRelDataType => true
+ case _ => false
+ }
+
+ def isTimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+ case ti: TimeIndicatorTypeInfo => true
+ case _ => false
+ }
+
def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match {
case BOOLEAN => BOOLEAN_TYPE_INFO
case TINYINT => BYTE_TYPE_INFO
@@ -199,6 +288,15 @@ object FlinkTypeFactory {
case VARCHAR | CHAR => STRING_TYPE_INFO
case DECIMAL => BIG_DEC_TYPE_INFO
+ // time indicators
+ case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
+ val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
+ if (indicator.isEventTime) {
+ TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+ } else {
+ TimeIndicatorTypeInfo.PROCTIME_INDICATOR
+ }
+
// temporal types
case DATE => SqlTimeTypeInfo.DATE
case TIME => SqlTimeTypeInfo.TIME
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
new file mode 100644
index 0000000..fa2e3ee
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl, RelRecordType, StructKind}
+import org.apache.calcite.rel.logical._
+import org.apache.calcite.rel.{RelNode, RelShuttleImpl}
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.calcite.FlinkTypeFactory.isTimeIndicatorType
+import org.apache.flink.table.functions.TimeMaterializationSqlFunction
+import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+
+import scala.collection.JavaConversions._
+
+/**
+ * Traverses a [[RelNode]] tree and converts fields with [[TimeIndicatorRelDataType]] type. If a
+ * time attribute is accessed for a calculation, it will be materialized. Forwarding is allowed in
+ * some cases, but not all.
+ */
+class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttleImpl {
+
+ override def visit(project: LogicalProject): RelNode = {
+ // visit children and update inputs
+ val updatedProject = super.visit(project).asInstanceOf[LogicalProject]
+
+ // check if input field contains time indicator type
+ // materialize field if no time indicator is present anymore
+ // if input field is already materialized, change to timestamp type
+ val materializer = new RexTimeIndicatorMaterializer(
+ rexBuilder,
+ updatedProject.getInput.getRowType.getFieldList.map(_.getType))
+ val newProjects = updatedProject.getProjects.map(_.accept(materializer))
+
+ // copy project
+ updatedProject.copy(
+ updatedProject.getTraitSet,
+ updatedProject.getInput,
+ newProjects,
+ buildRowType(updatedProject.getRowType.getFieldNames, newProjects.map(_.getType))
+ )
+ }
+
+ override def visit(filter: LogicalFilter): RelNode = {
+ // visit children and update inputs
+ val updatedFilter = super.visit(filter).asInstanceOf[LogicalFilter]
+
+ // check if input field contains time indicator type
+ // materialize field if no time indicator is present anymore
+ // if input field is already materialized, change to timestamp type
+ val materializer = new RexTimeIndicatorMaterializer(
+ rexBuilder,
+ updatedFilter.getInput.getRowType.getFieldList.map(_.getType))
+ val newCondition = updatedFilter.getCondition.accept(materializer)
+
+ // copy filter
+ updatedFilter.copy(
+ updatedFilter.getTraitSet,
+ updatedFilter.getInput,
+ newCondition
+ )
+ }
+
+ override def visit(union: LogicalUnion): RelNode = {
+ // visit children and update inputs
+ val updatedUnion = super.visit(union).asInstanceOf[LogicalUnion]
+
+ // make sure that time indicator types match
+ val inputTypes = updatedUnion.getInputs.map(_.getRowType)
+
+ val head = inputTypes.head.getFieldList.map(_.getType)
+
+ val isValid = inputTypes.forall { t =>
+ val fieldTypes = t.getFieldList.map(_.getType)
+
+ fieldTypes.zip(head).forall { case (l, r) =>
+ // check if time indicators match
+ if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) {
+ val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime
+ val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime
+ leftTime == rightTime
+ }
+ // one side is not an indicator
+ else if (isTimeIndicatorType(l) || isTimeIndicatorType(r)) {
+ false
+ }
+ // uninteresting types
+ else {
+ true
+ }
+ }
+ }
+
+ if (!isValid) {
+ throw new ValidationException(
+ "Union fields with time attributes have different types.")
+ }
+
+ updatedUnion
+ }
+
+ override def visit(other: RelNode): RelNode = other match {
+ case scan: LogicalTableFunctionScan if
+ stack.size() > 0 && stack.peek().isInstanceOf[LogicalCorrelate] =>
+ // visit children and update inputs
+ val updatedScan = super.visit(scan).asInstanceOf[LogicalTableFunctionScan]
+
+ val correlate = stack.peek().asInstanceOf[LogicalCorrelate]
+
+ // check if input field contains time indicator type
+ // materialize field if no time indicator is present anymore
+ // if input field is already materialized, change to timestamp type
+ val materializer = new RexTimeIndicatorMaterializer(
+ rexBuilder,
+ correlate.getInputs.get(0).getRowType.getFieldList.map(_.getType))
+ val newCall = updatedScan.getCall.accept(materializer)
+
+ // copy scan
+ updatedScan.copy(
+ updatedScan.getTraitSet,
+ updatedScan.getInputs,
+ newCall,
+ updatedScan.getElementType,
+ updatedScan.getRowType,
+ updatedScan.getColumnMappings
+ )
+
+ case _ =>
+ super.visit(other)
+ }
+
+ private def buildRowType(names: Seq[String], types: Seq[RelDataType]): RelDataType = {
+ val fields = names.zipWithIndex.map { case (name, idx) =>
+ new RelDataTypeFieldImpl(name, idx, types(idx))
+ }
+ new RelRecordType(StructKind.FULLY_QUALIFIED, fields)
+ }
+}
+
+class RexTimeIndicatorMaterializer(
+ private val rexBuilder: RexBuilder,
+ private val input: Seq[RelDataType])
+ extends RexShuttle {
+
+ val timestamp = rexBuilder
+ .getTypeFactory
+ .asInstanceOf[FlinkTypeFactory]
+ .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ // reference is interesting
+ if (isTimeIndicatorType(inputRef.getType)) {
+ val resolvedRefType = input(inputRef.getIndex)
+ // input is a valid time indicator
+ if (isTimeIndicatorType(resolvedRefType)) {
+ inputRef
+ }
+ // input has been materialized
+ else {
+ new RexInputRef(inputRef.getIndex, resolvedRefType)
+ }
+ }
+ // reference is a regular field
+ else {
+ super.visitInputRef(inputRef)
+ }
+ }
+
+ override def visitCall(call: RexCall): RexNode = {
+ val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
+
+ // skip materialization for special operators
+ updatedCall.getOperator match {
+ case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
+ return updatedCall
+
+ case _ => // do nothing
+ }
+
+ // materialize operands with time indicators
+ val materializedOperands = updatedCall.getOperands.map { o =>
+ if (isTimeIndicatorType(o.getType)) {
+ rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
+ } else {
+ o
+ }
+ }
+
+ // remove time indicator return type
+ if (isTimeIndicatorType(updatedCall.getType)) {
+ updatedCall.clone(timestamp, materializedOperands)
+ } else {
+ updatedCall.clone(updatedCall.getType, materializedOperands)
+ }
+ }
+}
+
+object RelTimeIndicatorConverter {
+
+ def convert(rootRel: RelNode, rexBuilder: RexBuilder): RelNode = {
+ val converter = new RelTimeIndicatorConverter(rexBuilder)
+ rootRel.accept(converter)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 5bb3b0e..25addbc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -23,7 +23,6 @@ import java.lang.{Iterable => JIterable}
import java.math.{BigDecimal => JBigDecimal}
import org.apache.calcite.avatica.util.DateTimeUtils
-import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.`type`.SqlTypeName._
@@ -42,8 +41,8 @@ import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
import org.apache.flink.table.codegen.Indenter.toISC
import org.apache.flink.table.codegen.calls.FunctionGenerator
import org.apache.flink.table.codegen.calls.ScalarOperators._
-import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, UserDefinedFunction}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction}
import org.apache.flink.table.runtime.TableFunctionCollector
import org.apache.flink.table.typeutils.TypeCheckUtils._
import org.apache.flink.types.Row
@@ -59,19 +58,18 @@ import scala.collection.mutable
* @param nullableInput input(s) can be null.
* @param input1 type information about the first input of the Function
* @param input2 type information about the second input if the Function is binary
- * @param input1PojoFieldMapping additional mapping information if input1 is a POJO (POJO types
- * have no deterministic field order).
- * @param input2PojoFieldMapping additional mapping information if input2 is a POJO (POJO types
- * have no deterministic field order).
- *
+ * @param input1FieldMapping additional mapping information for input1
+ * (e.g. POJO types have no deterministic field order and some input fields might not be read)
+ * @param input2FieldMapping additional mapping information for input2
+ * (e.g. POJO types have no deterministic field order and some input fields might not be read)
*/
class CodeGenerator(
- config: TableConfig,
- nullableInput: Boolean,
- input1: TypeInformation[_ <: Any],
- input2: Option[TypeInformation[_ <: Any]] = None,
- input1PojoFieldMapping: Option[Array[Int]] = None,
- input2PojoFieldMapping: Option[Array[Int]] = None)
+ config: TableConfig,
+ nullableInput: Boolean,
+ input1: TypeInformation[_ <: Any],
+ input2: Option[TypeInformation[_ <: Any]] = None,
+ input1FieldMapping: Option[Array[Int]] = None,
+ input2FieldMapping: Option[Array[Int]] = None)
extends RexVisitor[GeneratedExpression] {
// check if nullCheck is enabled when inputs can be null
@@ -82,7 +80,7 @@ class CodeGenerator(
// check for POJO input1 mapping
input1 match {
case pt: PojoTypeInfo[_] =>
- input1PojoFieldMapping.getOrElse(
+ input1FieldMapping.getOrElse(
throw new CodeGenException("No input mapping is specified for input1 of type POJO."))
case _ => // ok
}
@@ -90,11 +88,24 @@ class CodeGenerator(
// check for POJO input2 mapping
input2 match {
case Some(pt: PojoTypeInfo[_]) =>
- input2PojoFieldMapping.getOrElse(
+ input2FieldMapping.getOrElse(
throw new CodeGenException("No input mapping is specified for input2 of type POJO."))
case _ => // ok
}
+ private val input1Mapping = input1FieldMapping match {
+ case Some(mapping) => mapping
+ case _ => (0 until input1.getArity).toArray
+ }
+
+ private val input2Mapping = input2FieldMapping match {
+ case Some(mapping) => mapping
+ case _ => input2 match {
+ case Some(input) => (0 until input.getArity).toArray
+ case _ => Array[Int]()
+ }
+ }
+
/**
* A code generator for generating unary Flink
* [[org.apache.flink.api.common.functions.Function]]s with one input.
@@ -102,15 +113,15 @@ class CodeGenerator(
* @param config configuration that determines runtime behavior
* @param nullableInput input(s) can be null.
* @param input type information about the input of the Function
- * @param inputPojoFieldMapping additional mapping information necessary if input is a
- * POJO (POJO types have no deterministic field order).
+ * @param inputFieldMapping additional mapping information necessary for input
+ * (e.g. POJO types have no deterministic field order and some input fields might not be read)
*/
def this(
config: TableConfig,
nullableInput: Boolean,
input: TypeInformation[Any],
- inputPojoFieldMapping: Array[Int]) =
- this(config, nullableInput, input, None, Some(inputPojoFieldMapping))
+ inputFieldMapping: Array[Int]) =
+ this(config, nullableInput, input, None, Some(inputFieldMapping))
/**
* A code generator for generating Flink input formats.
@@ -249,7 +260,7 @@ class CodeGenerator(
* @param name Class name of the function.
* Does not need to be unique but has to be a valid Java class identifier.
* @param generator The code generator instance
- * @param inputType Input row type
+ * @param physicalInputTypes Physical input row types
* @param aggregates All aggregate functions
* @param aggFields Indexes of the input fields for all aggregate functions
* @param aggMapping The mapping of aggregates to output fields
@@ -270,7 +281,7 @@ class CodeGenerator(
def generateAggregations(
name: String,
generator: CodeGenerator,
- inputType: RelDataType,
+ physicalInputTypes: Seq[TypeInformation[_]],
aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]],
aggFields: Array[Array[Int]],
aggMapping: Array[Int],
@@ -295,8 +306,7 @@ class CodeGenerator(
val accTypes = accTypeClasses.map(_.getCanonicalName)
// get java classes of input fields
- val javaClasses = inputType.getFieldList
- .map(f => FlinkTypeFactory.toTypeInfo(f.getType).getTypeClass)
+ val javaClasses = physicalInputTypes.map(t => t.getTypeClass)
// get parameter lists for aggregation functions
val parameters = aggFields.map { inFields =>
val fields = for (f <- inFields) yield
@@ -844,12 +854,12 @@ class CodeGenerator(
returnType: TypeInformation[_ <: Any],
resultFieldNames: Seq[String])
: GeneratedExpression = {
- val input1AccessExprs = for (i <- 0 until input1.getArity)
- yield generateInputAccess(input1, input1Term, i, input1PojoFieldMapping)
+ val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+ yield generateInputAccess(input1, input1Term, i, input1Mapping)
val input2AccessExprs = input2 match {
- case Some(ti) => for (i <- 0 until ti.getArity)
- yield generateInputAccess(ti, input2Term, i, input2PojoFieldMapping)
+ case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i))
+ yield generateInputAccess(ti, input2Term, i, input2Mapping)
case None => Seq() // add nothing
}
@@ -861,14 +871,14 @@ class CodeGenerator(
*/
def generateCorrelateAccessExprs: (Seq[GeneratedExpression], Seq[GeneratedExpression]) = {
val input1AccessExprs = for (i <- 0 until input1.getArity)
- yield generateInputAccess(input1, input1Term, i, input1PojoFieldMapping)
+ yield generateInputAccess(input1, input1Term, i, input1Mapping)
val input2AccessExprs = input2 match {
- case Some(ti) => for (i <- 0 until ti.getArity)
+ case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i))
// use generateFieldAccess instead of generateInputAccess to avoid the generated table
// function's field access code is put on the top of function body rather than
// the while loop
- yield generateFieldAccess(ti, input2Term, i, input2PojoFieldMapping)
+ yield generateFieldAccess(ti, input2Term, i, input2Mapping)
case None => throw new CodeGenException("Type information of input2 must not be null.")
}
(input1AccessExprs, input2AccessExprs)
@@ -1123,11 +1133,11 @@ class CodeGenerator(
override def visitInputRef(inputRef: RexInputRef): GeneratedExpression = {
// if inputRef index is within size of input1 we work with input1, input2 otherwise
val input = if (inputRef.getIndex < input1.getArity) {
- (input1, input1Term, input1PojoFieldMapping)
+ (input1, input1Term, input1Mapping)
} else {
(input2.getOrElse(throw new CodeGenException("Invalid input access.")),
input2Term,
- input2PojoFieldMapping)
+ input2Mapping)
}
val index = if (input._2 == input1Term) {
@@ -1146,7 +1156,7 @@ class CodeGenerator(
refExpr.resultType,
refExpr.resultTerm,
index,
- input1PojoFieldMapping)
+ input1Mapping)
val resultTerm = newName("result")
val nullTerm = newName("isNull")
@@ -1302,6 +1312,11 @@ class CodeGenerator(
throw new CodeGenException("Dynamic parameter references are not supported yet.")
override def visitCall(call: RexCall): GeneratedExpression = {
+ // time materialization is not implemented yet
+ if (call.getOperator == TimeMaterializationSqlFunction) {
+ throw new CodeGenException("Access to time attributes is not possible yet.")
+ }
+
val operands = call.getOperands.map(_.accept(this))
val resultType = FlinkTypeFactory.toTypeInfo(call.getType)
@@ -1546,7 +1561,7 @@ class CodeGenerator(
inputType: TypeInformation[_ <: Any],
inputTerm: String,
index: Int,
- pojoFieldMapping: Option[Array[Int]])
+ fieldMapping: Array[Int])
: GeneratedExpression = {
// if input has been used before, we can reuse the code that
// has already been generated
@@ -1558,9 +1573,9 @@ class CodeGenerator(
// generate input access and unboxing if necessary
case None =>
val expr = if (nullableInput) {
- generateNullableInputFieldAccess(inputType, inputTerm, index, pojoFieldMapping)
+ generateNullableInputFieldAccess(inputType, inputTerm, index, fieldMapping)
} else {
- generateFieldAccess(inputType, inputTerm, index, pojoFieldMapping)
+ generateFieldAccess(inputType, inputTerm, index, fieldMapping)
}
reusableInputUnboxingExprs((inputTerm, index)) = expr
@@ -1574,7 +1589,7 @@ class CodeGenerator(
inputType: TypeInformation[_ <: Any],
inputTerm: String,
index: Int,
- pojoFieldMapping: Option[Array[Int]])
+ fieldMapping: Array[Int])
: GeneratedExpression = {
val resultTerm = newName("result")
val nullTerm = newName("isNull")
@@ -1582,7 +1597,7 @@ class CodeGenerator(
val fieldType = inputType match {
case ct: CompositeType[_] =>
val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]]) {
- pojoFieldMapping.get(index)
+ fieldMapping(index)
}
else {
index
@@ -1593,7 +1608,7 @@ class CodeGenerator(
}
val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType)
val defaultValue = primitiveDefaultValue(fieldType)
- val fieldAccessExpr = generateFieldAccess(inputType, inputTerm, index, pojoFieldMapping)
+ val fieldAccessExpr = generateFieldAccess(inputType, inputTerm, index, fieldMapping)
val inputCheckCode =
s"""
@@ -1617,12 +1632,12 @@ class CodeGenerator(
inputType: TypeInformation[_],
inputTerm: String,
index: Int,
- pojoFieldMapping: Option[Array[Int]])
+ fieldMapping: Array[Int])
: GeneratedExpression = {
inputType match {
case ct: CompositeType[_] =>
- val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]] && pojoFieldMapping.nonEmpty) {
- pojoFieldMapping.get(index)
+ val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]]) {
+ fieldMapping(index)
}
else {
index
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index d9f394b..64280c8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
-import org.apache.flink.table.functions.{EventTimeExtractor, ProcTimeExtractor}
import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction}
import scala.collection.mutable
@@ -496,15 +495,6 @@ object FunctionGenerator {
)
)
- // generate a constant for time indicator functions.
- // this is a temporary solution and will be removed when FLINK-5884 is implemented.
- case ProcTimeExtractor | EventTimeExtractor =>
- Some(new CallGenerator {
- override def generate(codeGenerator: CodeGenerator, operands: Seq[GeneratedExpression]) = {
- GeneratedExpression("0L", "false", "", SqlTimeTypeInfo.TIMESTAMP)
- }
- })
-
// built-in scalar function
case _ =>
sqlFunctions.get((sqlOperator, operandTypes))
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
index 4b5781f..08abc8f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
@@ -28,10 +28,49 @@ import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
object ExpressionUtils {
+ private[flink] def isTimeIntervalLiteral(expr: Expression): Boolean = expr match {
+ case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true
+ case _ => false
+ }
+
+ private[flink] def isRowCountLiteral(expr: Expression): Boolean = expr match {
+ case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) => true
+ case _ => false
+ }
+
+ private[flink] def isTimeAttribute(expr: Expression): Boolean = expr match {
+ case r: ResolvedFieldReference if FlinkTypeFactory.isTimeIndicatorType(r.resultType) => true
+ case _ => false
+ }
+
+ private[flink] def isRowtimeAttribute(expr: Expression): Boolean = expr match {
+ case r: ResolvedFieldReference if FlinkTypeFactory.isRowtimeIndicatorType(r.resultType) => true
+ case _ => false
+ }
+
+ private[flink] def isProctimeAttribute(expr: Expression): Boolean = expr match {
+ case r: ResolvedFieldReference if FlinkTypeFactory.isProctimeIndicatorType(r.resultType) =>
+ true
+ case _ => false
+ }
+
+ private[flink] def toTime(expr: Expression): FlinkTime = expr match {
+ case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+ FlinkTime.milliseconds(value)
+ case _ => throw new IllegalArgumentException()
+ }
+
+ private[flink] def toLong(expr: Expression): Long = expr match {
+ case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
+ case _ => throw new IllegalArgumentException()
+ }
+
private[flink] def toMonthInterval(expr: Expression, multiplier: Int): Expression = expr match {
case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MONTHS)
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
index 5f7204a..13f8a11 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -110,18 +110,11 @@ case class OverCall(
val aggExprs = agg.asInstanceOf[Aggregation].children.map(_.toRexNode(relBuilder)).asJava
// assemble order by key
- val orderKey = orderBy match {
- case _: RowTime =>
- new RexFieldCollation(relBuilder.call(EventTimeExtractor), Set[SqlKind]().asJava)
- case _: ProcTime =>
- new RexFieldCollation(relBuilder.call(ProcTimeExtractor), Set[SqlKind]().asJava)
- case _ =>
- throw new ValidationException("Invalid OrderBy expression.")
- }
+ val orderKey = new RexFieldCollation(orderBy.toRexNode, Set[SqlKind]().asJava)
val orderKeys = ImmutableList.of(orderKey)
// assemble partition by keys
- val partitionKeys = partitionBy.map(_.toRexNode(relBuilder)).asJava
+ val partitionKeys = partitionBy.map(_.toRexNode).asJava
// assemble bounds
val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
@@ -249,6 +242,11 @@ case class OverCall(
return ValidationFailure("Preceding and following must be of same interval type.")
}
+ // check time field
+ if (!ExpressionUtils.isTimeAttribute(orderBy)) {
+ return ValidationFailure("Ordering must be defined on a time attribute.")
+ }
+
ValidationSuccess
}
}