You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/12 06:11:06 UTC
[5/5] flink git commit: [FLINK-6483] [table] Add materialization of
time indicators.
[FLINK-6483] [table] Add materialization of time indicators.
This closes #3862.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b50ef4b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b50ef4b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b50ef4b8
Branch: refs/heads/master
Commit: b50ef4b8de73e0e19df154d87ea588236e3ccb43
Parents: 2480887
Author: twalthr <tw...@apache.org>
Authored: Wed May 10 10:11:34 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri May 12 08:09:54 2017 +0200
----------------------------------------------------------------------
.../table/api/StreamTableEnvironment.scala | 39 +-
.../calcite/RelTimeIndicatorConverter.scala | 404 +++++++++++++------
.../flink/table/codegen/CodeGenerator.scala | 47 ++-
.../flink/table/plan/nodes/CommonCalc.scala | 13 +-
.../table/plan/nodes/CommonCorrelate.scala | 19 +-
.../table/plan/nodes/dataset/DataSetCalc.scala | 6 +-
.../plan/nodes/dataset/DataSetCorrelate.scala | 8 +-
.../plan/nodes/datastream/DataStreamCalc.scala | 14 +-
.../nodes/datastream/DataStreamCorrelate.scala | 23 +-
.../datastream/StreamTableSourceScan.scala | 4 +-
.../plan/nodes/logical/FlinkLogicalCalc.scala | 2 +-
.../logical/FlinkLogicalTableSourceScan.scala | 6 +-
.../DataStreamLogicalWindowAggregateRule.scala | 14 +-
.../flink/table/plan/schema/RowSchema.scala | 11 +-
.../plan/schema/StreamTableSourceTable.scala | 8 +-
.../runtime/CRowCorrelateFlatMapRunner.scala | 83 ----
.../runtime/CRowCorrelateProcessRunner.scala | 91 +++++
.../flink/table/runtime/CRowFlatMapRunner.scala | 72 ----
.../flink/table/runtime/CRowProcessRunner.scala | 80 ++++
.../table/sources/DefinedTimeAttributes.scala | 60 ---
.../table/sources/definedTimeAttributes.scala | 60 +++
.../stream/StreamTableEnvironmentTest.scala | 10 +-
.../api/scala/stream/TableSourceTest.scala | 22 +-
.../calcite/RelTimeIndicatorConverterTest.scala | 351 ++++++++++++++++
.../datastream/TimeAttributesITCase.scala | 237 +++++++++++
.../flink/table/utils/TableTestBase.scala | 5 +
26 files changed, 1267 insertions(+), 422 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index d68da04..994ac80 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -18,38 +18,37 @@
package org.apache.flink.table.api
-import _root_.java.util.concurrent.atomic.AtomicInteger
import _root_.java.lang.{Boolean => JBool}
+import _root_.java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.plan.hep.HepMatchOrder
-import org.apache.calcite.rel.{RelNode, RelVisitor}
import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelVisitor}
import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{RuleSet, RuleSets}
-import org.apache.flink.api.common.typeinfo.AtomicType
-import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.calcite.RelTimeIndicatorConverter
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference}
import org.apache.flink.table.plan.nodes.FlinkConventions
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
-import org.apache.flink.table.plan.nodes.datastream._
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait, _}
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable}
-import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner}
import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
-import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.TypeCheckUtils
import org.apache.flink.types.Row
@@ -111,6 +110,17 @@ abstract class StreamTableEnvironment(
override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
checkValidTableName(name)
+ // check if event-time is enabled
+ tableSource match {
+ case dra: DefinedRowtimeAttribute if
+ execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime =>
+
+ throw TableException(
+ s"A rowtime attribute requires an EventTime time characteristic in stream environment. " +
+ s"But is: ${execEnv.getStreamTimeCharacteristic}")
+ case _ => // ok
+ }
+
tableSource match {
case streamTableSource: StreamTableSource[_] =>
registerTableInternal(name, new StreamTableSourceTable(streamTableSource))
@@ -390,6 +400,13 @@ abstract class StreamTableEnvironment(
// validate and extract time attributes
val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, fields)
+ // check if event-time is enabled
+ if (rowtime.isDefined && execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) {
+ throw TableException(
+ s"A rowtime attribute requires an EventTime time characteristic in stream environment. " +
+ s"But is: ${execEnv.getStreamTimeCharacteristic}")
+ }
+
val dataStreamTable = new DataStreamTable[T](
dataStream,
fieldIndexes,
@@ -518,9 +535,9 @@ abstract class StreamTableEnvironment(
// 3. normalize the logical plan
val normRuleSet = getNormRuleSet
val normalizedPlan = if (normRuleSet.iterator().hasNext) {
- runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, decorPlan, decorPlan.getTraitSet)
+ runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet)
} else {
- decorPlan
+ convPlan
}
// 4. optimize the logical Flink plan
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index fa2e3ee..7ceb397 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -18,73 +18,43 @@
package org.apache.flink.table.calcite
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl, RelRecordType, StructKind}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core._
import org.apache.calcite.rel.logical._
-import org.apache.calcite.rel.{RelNode, RelShuttleImpl}
+import org.apache.calcite.rel.{RelNode, RelShuttle}
import org.apache.calcite.rex._
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.calcite.FlinkTypeFactory.isTimeIndicatorType
+import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.functions.TimeMaterializationSqlFunction
import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.calcite.FlinkTypeFactory.isTimeIndicatorType
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
import scala.collection.JavaConversions._
+import scala.collection.mutable
/**
* Traverses a [[RelNode]] tree and converts fields with [[TimeIndicatorRelDataType]] type. If a
* time attribute is accessed for a calculation, it will be materialized. Forwarding is allowed in
* some cases, but not all.
*/
-class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttleImpl {
+class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
- override def visit(project: LogicalProject): RelNode = {
- // visit children and update inputs
- val updatedProject = super.visit(project).asInstanceOf[LogicalProject]
+ private val timestamp = rexBuilder
+ .getTypeFactory
+ .asInstanceOf[FlinkTypeFactory]
+ .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
- // check if input field contains time indicator type
- // materialize field if no time indicator is present anymore
- // if input field is already materialized, change to timestamp type
- val materializer = new RexTimeIndicatorMaterializer(
- rexBuilder,
- updatedProject.getInput.getRowType.getFieldList.map(_.getType))
- val newProjects = updatedProject.getProjects.map(_.accept(materializer))
-
- // copy project
- updatedProject.copy(
- updatedProject.getTraitSet,
- updatedProject.getInput,
- newProjects,
- buildRowType(updatedProject.getRowType.getFieldNames, newProjects.map(_.getType))
- )
- }
-
- override def visit(filter: LogicalFilter): RelNode = {
- // visit children and update inputs
- val updatedFilter = super.visit(filter).asInstanceOf[LogicalFilter]
-
- // check if input field contains time indicator type
- // materialize field if no time indicator is present anymore
- // if input field is already materialized, change to timestamp type
- val materializer = new RexTimeIndicatorMaterializer(
- rexBuilder,
- updatedFilter.getInput.getRowType.getFieldList.map(_.getType))
- val newCondition = updatedFilter.getCondition.accept(materializer)
-
- // copy filter
- updatedFilter.copy(
- updatedFilter.getTraitSet,
- updatedFilter.getInput,
- newCondition
- )
- }
+ override def visit(intersect: LogicalIntersect): RelNode =
+ throw new TableException("Logical intersect in a stream environment is not supported yet.")
override def visit(union: LogicalUnion): RelNode = {
// visit children and update inputs
- val updatedUnion = super.visit(union).asInstanceOf[LogicalUnion]
+ val inputs = union.getInputs.map(_.accept(this))
// make sure that time indicator types match
- val inputTypes = updatedUnion.getInputs.map(_.getRowType)
+ val inputTypes = inputs.map(_.getRowType)
val head = inputTypes.head.getFieldList.map(_.getType)
@@ -114,101 +84,269 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttleImpl {
"Union fields with time attributes have different types.")
}
- updatedUnion
+ LogicalUnion.create(inputs, union.all)
}
+ override def visit(aggregate: LogicalAggregate): RelNode = convertAggregate(aggregate)
+
+ override def visit(minus: LogicalMinus): RelNode =
+ throw new TableException("Logical minus in a stream environment is not supported yet.")
+
+ override def visit(sort: LogicalSort): RelNode =
+ throw new TableException("Logical sort in a stream environment is not supported yet.")
+
+ override def visit(`match`: LogicalMatch): RelNode =
+ throw new TableException("Logical match in a stream environment is not supported yet.")
+
override def visit(other: RelNode): RelNode = other match {
- case scan: LogicalTableFunctionScan if
- stack.size() > 0 && stack.peek().isInstanceOf[LogicalCorrelate] =>
+
+ case uncollect: Uncollect =>
// visit children and update inputs
- val updatedScan = super.visit(scan).asInstanceOf[LogicalTableFunctionScan]
-
- val correlate = stack.peek().asInstanceOf[LogicalCorrelate]
-
- // check if input field contains time indicator type
- // materialize field if no time indicator is present anymore
- // if input field is already materialized, change to timestamp type
- val materializer = new RexTimeIndicatorMaterializer(
- rexBuilder,
- correlate.getInputs.get(0).getRowType.getFieldList.map(_.getType))
- val newCall = updatedScan.getCall.accept(materializer)
-
- // copy scan
- updatedScan.copy(
- updatedScan.getTraitSet,
- updatedScan.getInputs,
- newCall,
- updatedScan.getElementType,
- updatedScan.getRowType,
- updatedScan.getColumnMappings
- )
+ val input = uncollect.getInput.accept(this)
+ Uncollect.create(uncollect.getTraitSet, input, uncollect.withOrdinality)
+
+ case scan: LogicalTableFunctionScan =>
+ scan
+
+ case aggregate: LogicalWindowAggregate =>
+ val convAggregate = convertAggregate(aggregate)
+
+ LogicalWindowAggregate.create(
+ aggregate.getWindow,
+ aggregate.getNamedProperties,
+ convAggregate)
case _ =>
- super.visit(other)
+ throw new TableException(s"Unsupported logical operator: ${other.getClass.getSimpleName}")
}
- private def buildRowType(names: Seq[String], types: Seq[RelDataType]): RelDataType = {
- val fields = names.zipWithIndex.map { case (name, idx) =>
- new RelDataTypeFieldImpl(name, idx, types(idx))
- }
- new RelRecordType(StructKind.FULLY_QUALIFIED, fields)
+
+ override def visit(exchange: LogicalExchange): RelNode =
+ throw new TableException("Logical exchange in a stream environment is not supported yet.")
+
+ override def visit(scan: TableScan): RelNode = scan
+
+ override def visit(scan: TableFunctionScan): RelNode =
+ throw new TableException("Table function scan in a stream environment is not supported yet.")
+
+ override def visit(values: LogicalValues): RelNode = values
+
+ override def visit(filter: LogicalFilter): RelNode = {
+ // visit children and update inputs
+ val input = filter.getInput.accept(this)
+
+ // check if input field contains time indicator type
+ // materialize field if no time indicator is present anymore
+ // if input field is already materialized, change to timestamp type
+ val materializer = new RexTimeIndicatorMaterializer(
+ rexBuilder,
+ input.getRowType.getFieldList.map(_.getType))
+
+ val condition = filter.getCondition.accept(materializer)
+ LogicalFilter.create(input, condition)
}
-}
-class RexTimeIndicatorMaterializer(
- private val rexBuilder: RexBuilder,
- private val input: Seq[RelDataType])
- extends RexShuttle {
-
- val timestamp = rexBuilder
- .getTypeFactory
- .asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
-
- override def visitInputRef(inputRef: RexInputRef): RexNode = {
- // reference is interesting
- if (isTimeIndicatorType(inputRef.getType)) {
- val resolvedRefType = input(inputRef.getIndex)
- // input is a valid time indicator
- if (isTimeIndicatorType(resolvedRefType)) {
- inputRef
- }
- // input has been materialized
- else {
- new RexInputRef(inputRef.getIndex, resolvedRefType)
- }
- }
- // reference is a regular field
- else {
- super.visitInputRef(inputRef)
+ override def visit(project: LogicalProject): RelNode = {
+ // visit children and update inputs
+ val input = project.getInput.accept(this)
+
+ // check if input field contains time indicator type
+ // materialize field if no time indicator is present anymore
+ // if input field is already materialized, change to timestamp type
+ val materializer = new RexTimeIndicatorMaterializer(
+ rexBuilder,
+ input.getRowType.getFieldList.map(_.getType))
+
+ val projects = project.getProjects.map(_.accept(materializer))
+ val fieldNames = project.getRowType.getFieldNames
+ LogicalProject.create(input, projects, fieldNames)
+ }
+
+ override def visit(join: LogicalJoin): RelNode =
+ throw new TableException("Logical join in a stream environment is not supported yet.")
+
+ override def visit(correlate: LogicalCorrelate): RelNode = {
+ // visit children and update inputs
+ val inputs = correlate.getInputs.map(_.accept(this))
+
+ val right = inputs(1) match {
+ case scan: LogicalTableFunctionScan =>
+ // visit children and update inputs
+ val scanInputs = scan.getInputs.map(_.accept(this))
+
+ // check if input field contains time indicator type
+ // materialize field if no time indicator is present anymore
+ // if input field is already materialized, change to timestamp type
+ val materializer = new RexTimeIndicatorMaterializer(
+ rexBuilder,
+ inputs.head.getRowType.getFieldList.map(_.getType))
+
+ val call = scan.getCall.accept(materializer)
+ LogicalTableFunctionScan.create(
+ scan.getCluster,
+ scanInputs,
+ call,
+ scan.getElementType,
+ scan.getRowType,
+ scan.getColumnMappings)
+
+ case _ =>
+ inputs(1)
}
+
+ LogicalCorrelate.create(
+ inputs.head,
+ right,
+ correlate.getCorrelationId,
+ correlate.getRequiredColumns,
+ correlate.getJoinType)
}
- override def visitCall(call: RexCall): RexNode = {
- val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
+ private def convertAggregate(aggregate: Aggregate): LogicalAggregate = {
+ // visit children and update inputs
+ val input = aggregate.getInput.accept(this)
+
+ // add a project to materialize aggregation arguments/grouping keys
+
+ val refIndices = mutable.Set[Int]()
+
+ // check arguments of agg calls
+ aggregate.getAggCallList.foreach(call => if (call.getArgList.size() == 0) {
+ // count(*) has an empty argument list
+ (0 until input.getRowType.getFieldCount).foreach(refIndices.add)
+ } else {
+ // for other aggregations
+ call.getArgList.map(_.asInstanceOf[Int]).foreach(refIndices.add)
+ })
- // skip materialization for special operators
- updatedCall.getOperator match {
- case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
- return updatedCall
+ // check grouping sets
+ aggregate.getGroupSets.foreach(set =>
+ set.asList().map(_.asInstanceOf[Int]).foreach(refIndices.add)
+ )
- case _ => // do nothing
+ val needsMaterialization = refIndices.exists(idx =>
+ isTimeIndicatorType(input.getRowType.getFieldList.get(idx).getType))
+
+ // create project if necessary
+ val projectedInput = if (needsMaterialization) {
+
+ // insert or merge with input project if
+ // a time attribute is accessed and needs to be materialized
+ input match {
+
+ // merge
+ case lp: LogicalProject =>
+ val projects = lp.getProjects.zipWithIndex.map { case (expr, idx) =>
+ if (isTimeIndicatorType(expr.getType) && refIndices.contains(idx)) {
+ rexBuilder.makeCall(
+ TimeMaterializationSqlFunction,
+ expr)
+ } else {
+ expr
+ }
+ }
+
+ LogicalProject.create(
+ lp.getInput,
+ projects,
+ input.getRowType.getFieldNames)
+
+ // new project
+ case _ =>
+ val projects = input.getRowType.getFieldList.map { field =>
+ if (isTimeIndicatorType(field.getType) && refIndices.contains(field.getIndex)) {
+ rexBuilder.makeCall(
+ TimeMaterializationSqlFunction,
+ new RexInputRef(field.getIndex, field.getType))
+ } else {
+ new RexInputRef(field.getIndex, field.getType)
+ }
+ }
+
+ LogicalProject.create(
+ input,
+ projects,
+ input.getRowType.getFieldNames)
+ }
+ } else {
+ // no project necessary
+ input
}
- // materialize operands with time indicators
- val materializedOperands = updatedCall.getOperands.map { o =>
- if (isTimeIndicatorType(o.getType)) {
- rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
+ // remove time indicator type as agg call return type
+ val updatedAggCalls = aggregate.getAggCallList.map { call =>
+ val callType = if (isTimeIndicatorType(call.getType)) {
+ timestamp
} else {
- o
+ call.getType
}
+ AggregateCall.create(
+ call.getAggregation,
+ call.isDistinct,
+ call.getArgList,
+ call.filterArg,
+ callType,
+ call.name)
}
- // remove time indicator return type
- if (isTimeIndicatorType(updatedCall.getType)) {
- updatedCall.clone(timestamp, materializedOperands)
- } else {
- updatedCall.clone(updatedCall.getType, materializedOperands)
+ LogicalAggregate.create(
+ projectedInput,
+ aggregate.indicator,
+ aggregate.getGroupSet,
+ aggregate.getGroupSets,
+ updatedAggCalls)
+ }
+
+ class RexTimeIndicatorMaterializer(
+ private val rexBuilder: RexBuilder,
+ private val input: Seq[RelDataType])
+ extends RexShuttle {
+
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ // reference is interesting
+ if (isTimeIndicatorType(inputRef.getType)) {
+ val resolvedRefType = input(inputRef.getIndex)
+ // input is a valid time indicator
+ if (isTimeIndicatorType(resolvedRefType)) {
+ inputRef
+ }
+ // input has been materialized
+ else {
+ new RexInputRef(inputRef.getIndex, resolvedRefType)
+ }
+ }
+ // reference is a regular field
+ else {
+ super.visitInputRef(inputRef)
+ }
+ }
+
+ override def visitCall(call: RexCall): RexNode = {
+ val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
+
+ // materialize operands with time indicators
+ val materializedOperands = updatedCall.getOperator match {
+
+ // skip materialization for special operators
+ case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
+ updatedCall.getOperands.toList
+
+ case _ =>
+ updatedCall.getOperands.map { o =>
+ if (isTimeIndicatorType(o.getType)) {
+ rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
+ } else {
+ o
+ }
+ }
+ }
+
+ // remove time indicator return type
+ if (isTimeIndicatorType(updatedCall.getType)) {
+ updatedCall.clone(timestamp, materializedOperands)
+ } else {
+ updatedCall.clone(updatedCall.getType, materializedOperands)
+ }
}
}
}
@@ -217,6 +355,30 @@ object RelTimeIndicatorConverter {
def convert(rootRel: RelNode, rexBuilder: RexBuilder): RelNode = {
val converter = new RelTimeIndicatorConverter(rexBuilder)
- rootRel.accept(converter)
+ val convertedRoot = rootRel.accept(converter)
+
+ var needsConversion = false
+
+ // materialize all remaining time indicators
+ val projects = convertedRoot.getRowType.getFieldList.map(field =>
+ if (isTimeIndicatorType(field.getType)) {
+ needsConversion = true
+ rexBuilder.makeCall(
+ TimeMaterializationSqlFunction,
+ new RexInputRef(field.getIndex, field.getType))
+ } else {
+ new RexInputRef(field.getIndex, field.getType)
+ }
+ )
+
+ // add final conversion
+ if (needsConversion) {
+ LogicalProject.create(
+ convertedRoot,
+ projects,
+ convertedRoot.getRowType.getFieldNames)
+ } else {
+ convertedRoot
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 25addbc..036889f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenUtils._
@@ -238,6 +239,11 @@ class CodeGenerator(
var outRecordTerm = "out"
/**
+ * @return term of the [[ProcessFunction]]'s context
+ */
+ var contextTerm = "ctx"
+
+ /**
* @return returns if null checking is enabled
*/
def nullCheck: Boolean = config.getNullCheck
@@ -699,6 +705,17 @@ class CodeGenerator(
List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
}
+
+ // ProcessFunction
+ else if (clazz == classOf[ProcessFunction[_, _]]) {
+ val baseClass = classOf[ProcessFunction[_, _]]
+ val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+ (baseClass,
+ s"void processElement(Object _in1, " +
+ s"org.apache.flink.streaming.api.functions.ProcessFunction.Context $contextTerm," +
+ s"org.apache.flink.util.Collector $collectorTerm)",
+ List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
+ }
else {
// TODO more functions
throw new CodeGenException("Unsupported Function.")
@@ -1312,9 +1329,11 @@ class CodeGenerator(
throw new CodeGenException("Dynamic parameter references are not supported yet.")
override def visitCall(call: RexCall): GeneratedExpression = {
- // time materialization is not implemented yet
+ // special case: time materialization
if (call.getOperator == TimeMaterializationSqlFunction) {
- throw new CodeGenException("Access to time attributes is not possible yet.")
+ return generateRecordTimestamp(
+ FlinkTypeFactory.isRowtimeIndicatorType(call.getOperands.get(0).getType)
+ )
}
val operands = call.getOperands.map(_.accept(this))
@@ -1840,6 +1859,30 @@ class CodeGenerator(
}
}
+ private[flink] def generateRecordTimestamp(isEventTime: Boolean): GeneratedExpression = {
+ val resultTerm = newName("result")
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+
+ val resultCode = if (isEventTime) {
+ s"""
+ |$resultTypeTerm $resultTerm;
+ |if ($contextTerm.timestamp() == null) {
+ | throw new RuntimeException("Rowtime timestamp is null. Please make sure that a proper " +
+ | "TimestampAssigner is defined and the stream environment uses the EventTime time " +
+ | "characteristic.");
+ |}
+ |else {
+ | $resultTerm = $contextTerm.timestamp();
+ |}
+ |""".stripMargin
+ } else {
+ s"""
+ |$resultTypeTerm $resultTerm = $contextTerm.timerService().currentProcessingTime();
+ |""".stripMargin
+ }
+ GeneratedExpression(resultTerm, NEVER_NULL, resultCode, SqlTimeTypeInfo.TIMESTAMP)
+ }
+
// ----------------------------------------------------------------------------------------------
// Reusable code snippets
// ----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index e875587..9b486e4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes
import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
import org.apache.calcite.rex._
-import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.functions.Function
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
@@ -30,16 +30,17 @@ import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
-trait CommonCalc[T] {
+trait CommonCalc {
- private[flink] def generateFunction(
+ private[flink] def generateFunction[T <: Function](
generator: CodeGenerator,
ruleDescription: String,
inputSchema: RowSchema,
returnSchema: RowSchema,
calcProgram: RexProgram,
- config: TableConfig):
- GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
+ config: TableConfig,
+ functionClass: Class[T]):
+ GeneratedFunction[T, Row] = {
val expandedExpressions = calcProgram
.getProjectList
@@ -92,7 +93,7 @@ trait CommonCalc[T] {
generator.generateFunction(
ruleDescription,
- classOf[FlatMapFunction[Row, Row]],
+ functionClass,
body,
returnSchema.physicalTypeInfo)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index c95f2f7..874bea2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexShuttle}
import org.apache.calcite.sql.SemiJoinType
-import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.functions.{FlatMapFunction, Function}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
@@ -36,22 +36,22 @@ import scala.collection.JavaConverters._
/**
* Join a user-defined table function
*/
-trait CommonCorrelate[T] {
+trait CommonCorrelate {
/**
* Generates the flat map function to run the user-defined table function.
*/
- private[flink] def generateFunction(
+ private[flink] def generateFunction[T <: Function](
config: TableConfig,
inputSchema: RowSchema,
udtfTypeInfo: TypeInformation[Any],
returnSchema: RowSchema,
- rowType: RelDataType,
joinType: SemiJoinType,
rexCall: RexCall,
pojoFieldMapping: Option[Array[Int]],
- ruleDescription: String):
- GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
+ ruleDescription: String,
+ functionClass: Class[T]):
+ GeneratedFunction[T, Row] = {
val functionGenerator = new CodeGenerator(
config,
@@ -89,7 +89,7 @@ trait CommonCorrelate[T] {
val outerResultExpr = functionGenerator.generateResultExpression(
input1AccessExprs ++ input2NullExprs,
returnSchema.physicalTypeInfo,
- rowType.getFieldNames.asScala)
+ returnSchema.physicalFieldNames)
body +=
s"""
|boolean hasOutput = $collectorTerm.isCollected();
@@ -104,7 +104,7 @@ trait CommonCorrelate[T] {
functionGenerator.generateFunction(
ruleDescription,
- classOf[FlatMapFunction[Row, Row]],
+ functionClass,
body,
returnSchema.physicalTypeInfo)
}
@@ -117,7 +117,6 @@ trait CommonCorrelate[T] {
inputSchema: RowSchema,
udtfTypeInfo: TypeInformation[Any],
returnSchema: RowSchema,
- rowType: RelDataType,
condition: Option[RexNode],
pojoFieldMapping: Option[Array[Int]])
: GeneratedCollector = {
@@ -135,7 +134,7 @@ trait CommonCorrelate[T] {
val crossResultExpr = generator.generateResultExpression(
input1AccessExprs ++ input2AccessExprs,
returnSchema.physicalTypeInfo,
- rowType.getFieldNames.asScala)
+ returnSchema.physicalFieldNames)
val collectorCode = if (condition.isEmpty) {
s"""
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index e340a8c..9a9f738 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex._
+import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.BatchTableEnvironment
@@ -46,7 +47,7 @@ class DataSetCalc(
calcProgram: RexProgram,
ruleDescription: String)
extends Calc(cluster, traitSet, input, calcProgram)
- with CommonCalc[Row]
+ with CommonCalc
with DataSetRel {
override def deriveRowType(): RelDataType = rowRelDataType
@@ -95,7 +96,8 @@ class DataSetCalc(
new RowSchema(getInput.getRowType),
new RowSchema(getRowType),
calcProgram,
- config)
+ config,
+ classOf[FlatMapFunction[Row, Row]])
val runner = new FlatMapRunner(genFunction.name, genFunction.code, returnType)
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
index 49ead26..731d2e5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -23,6 +23,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexCall, RexNode}
import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -49,7 +50,7 @@ class DataSetCorrelate(
joinType: SemiJoinType,
ruleDescription: String)
extends SingleRel(cluster, traitSet, inputNode)
- with CommonCorrelate[Row]
+ with CommonCorrelate
with DataSetRel {
override def deriveRowType() = relRowType
@@ -109,18 +110,17 @@ class DataSetCorrelate(
new RowSchema(getInput.getRowType),
udtfTypeInfo,
new RowSchema(getRowType),
- rowType,
joinType,
rexCall,
pojoFieldMapping,
- ruleDescription)
+ ruleDescription,
+ classOf[FlatMapFunction[Row, Row]])
val collector = generateCollector(
config,
new RowSchema(getInput.getRowType),
udtfTypeInfo,
new RowSchema(getRowType),
- rowType,
condition,
pojoFieldMapping)
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 5f270f6..f75efc8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -25,11 +25,12 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex.RexProgram
import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
-import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.plan.nodes.CommonCalc
-import org.apache.flink.table.runtime.CRowFlatMapRunner
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CRowProcessRunner
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
/**
@@ -45,7 +46,7 @@ class DataStreamCalc(
calcProgram: RexProgram,
ruleDescription: String)
extends Calc(cluster, traitSet, input, calcProgram)
- with CommonCalc[CRow]
+ with CommonCalc
with DataStreamRel {
override def deriveRowType(): RelDataType = schema.logicalType
@@ -101,17 +102,18 @@ class DataStreamCalc(
inputSchema,
schema,
calcProgram,
- config)
+ config,
+ classOf[ProcessFunction[CRow, CRow]])
val inputParallelism = inputDataStream.getParallelism
- val mapFunc = new CRowFlatMapRunner(
+ val processFunc = new CRowProcessRunner(
genFunction.name,
genFunction.code,
CRowTypeInfo(schema.physicalTypeInfo))
inputDataStream
- .flatMap(mapFunc)
+ .process(processFunc)
.name(calcOpName(calcProgram, getExpressionString))
// keep parallelism to ensure order of accumulate and retract messages
.setParallelism(inputParallelism)
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 5b32b10..b7165cd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -23,12 +23,13 @@ import org.apache.calcite.rex.{RexCall, RexNode}
import org.apache.calcite.sql.SemiJoinType
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.plan.nodes.CommonCorrelate
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.table.runtime.CRowCorrelateFlatMapRunner
+import org.apache.flink.table.runtime.CRowCorrelateProcessRunner
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
/**
@@ -46,7 +47,7 @@ class DataStreamCorrelate(
joinType: SemiJoinType,
ruleDescription: String)
extends SingleRel(cluster, traitSet, input)
- with CommonCorrelate[CRow]
+ with CommonCorrelate
with DataStreamRel {
override def deriveRowType() = schema.logicalType
@@ -90,7 +91,6 @@ class DataStreamCorrelate(
// we do not need to specify input type
val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
- val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo]
val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
val rexCall = funcRel.getCall.asInstanceOf[RexCall]
@@ -98,37 +98,36 @@ class DataStreamCorrelate(
val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping)
val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
- val flatMap = generateFunction(
+ val process = generateFunction(
config,
inputSchema,
udtfTypeInfo,
schema,
- getRowType,
joinType,
rexCall,
pojoFieldMapping,
- ruleDescription)
+ ruleDescription,
+ classOf[ProcessFunction[CRow, CRow]])
val collector = generateCollector(
config,
inputSchema,
udtfTypeInfo,
schema,
- getRowType,
condition,
pojoFieldMapping)
- val mapFunc = new CRowCorrelateFlatMapRunner(
- flatMap.name,
- flatMap.code,
+ val processFunc = new CRowCorrelateProcessRunner(
+ process.name,
+ process.code,
collector.name,
collector.code,
- CRowTypeInfo(flatMap.returnType))
+ CRowTypeInfo(process.returnType))
val inputParallelism = inputDS.getParallelism
inputDS
- .flatMap(mapFunc)
+ .process(processFunc)
// preserve input parallelism to ensure that acc and retract messages remain in order
.setParallelism(inputParallelism)
.name(correlateOpName(rexCall, sqlFunction, schema.logicalType))
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 51e609f..72ecac5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -49,7 +49,7 @@ class StreamTableSourceScan(
val fieldCnt = fieldNames.length
val rowtime = tableSource match {
- case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null =>
+ case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
val rowtimeAttribute = timeSource.getRowtimeAttribute
Some((fieldCnt, rowtimeAttribute))
case _ =>
@@ -57,7 +57,7 @@ class StreamTableSourceScan(
}
val proctime = tableSource match {
- case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null =>
+ case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
val proctimeAttribute = timeSource.getProctimeAttribute
Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
case _ =>
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
index 0ca079e..ec90392 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
@@ -34,7 +34,7 @@ class FlinkLogicalCalc(
calcProgram: RexProgram)
extends Calc(cluster, traitSet, input, calcProgram)
with FlinkLogicalRel
- with CommonCalc[Any] {
+ with CommonCalc {
override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
new FlinkLogicalCalc(cluster, traitSet, child, program)
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index a2777ec..3ae949e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, TableSource}
+import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, TableSource}
import scala.collection.JavaConverters._
@@ -54,7 +54,7 @@ class FlinkLogicalTableSourceScan(
val fieldCnt = fieldNames.length
val rowtime = tableSource match {
- case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null =>
+ case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
val rowtimeAttribute = timeSource.getRowtimeAttribute
Some((fieldCnt, rowtimeAttribute))
case _ =>
@@ -62,7 +62,7 @@ class FlinkLogicalTableSourceScan(
}
val proctime = tableSource match {
- case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null =>
+ case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
val proctimeAttribute = timeSource.getProctimeAttribute
Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
case _ =>
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index 28efcf5..d57d4cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.flink.table.api.{TableException, Window}
import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.{Literal, UnresolvedFieldReference}
+import org.apache.flink.table.expressions.{Literal, ResolvedFieldReference, UnresolvedFieldReference}
import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
@@ -68,10 +68,12 @@ class DataStreamLogicalWindowAggregateRule
case _ => throw new TableException("Only constant window descriptors are supported.")
}
- def getOperandAsTimeIndicator(call: RexCall, idx: Int): String =
+ def getOperandAsTimeIndicator(call: RexCall, idx: Int): ResolvedFieldReference =
call.getOperands.get(idx) match {
case v: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(v.getType) =>
- rowType.getFieldList.get(v.getIndex).getName
+ ResolvedFieldReference(
+ rowType.getFieldList.get(v.getIndex).getName,
+ FlinkTypeFactory.toTypeInfo(v.getType))
case _ =>
throw new TableException("Window can only be defined over a time attribute column.")
}
@@ -82,7 +84,7 @@ class DataStreamLogicalWindowAggregateRule
val interval = getOperandAsLong(windowExpr, 1)
val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS))
- w.on(UnresolvedFieldReference(time)).as("w$")
+ w.on(time).as("w$")
case SqlStdOperatorTable.HOP =>
val time = getOperandAsTimeIndicator(windowExpr, 0)
@@ -91,14 +93,14 @@ class DataStreamLogicalWindowAggregateRule
.over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
.every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
- w.on(UnresolvedFieldReference(time)).as("w$")
+ w.on(time).as("w$")
case SqlStdOperatorTable.SESSION =>
val time = getOperandAsTimeIndicator(windowExpr, 0)
val gap = getOperandAsLong(windowExpr, 1)
val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
- w.on(UnresolvedFieldReference(time)).as("w$")
+ w.on(time).as("w$")
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
index b42be82..ccbe44d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
@@ -20,11 +20,12 @@ package org.apache.flink.table.plan.schema
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelRecordType}
import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexShuttle}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.TableException
import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.TimeMaterializationSqlFunction
import org.apache.flink.types.Row
import scala.collection.JavaConversions._
@@ -76,6 +77,14 @@ class RowSchema(private val logicalRowType: RelDataType) {
override def visitInputRef(inputRef: RexInputRef): RexNode = {
new RexInputRef(mapIndex(inputRef.getIndex), inputRef.getType)
}
+
+ override def visitCall(call: RexCall): RexNode = call.getOperator match {
+ // we leave time indicators unchanged yet
+ // the index becomes invalid but right now we are only
+ // interested in the type of the input reference
+ case TimeMaterializationSqlFunction => call
+ case _ => super.visitCall(call)
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
index 75deca5..fa15288 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
@@ -19,10 +19,10 @@
package org.apache.flink.table.plan.schema
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, TableSource}
+import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, TableSource}
class StreamTableSourceTable[T](
override val tableSource: TableSource[T],
@@ -39,7 +39,7 @@ class StreamTableSourceTable[T](
val fieldCnt = fieldNames.length
val rowtime = tableSource match {
- case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null =>
+ case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
val rowtimeAttribute = timeSource.getRowtimeAttribute
Some((fieldCnt, rowtimeAttribute))
case _ =>
@@ -47,7 +47,7 @@ class StreamTableSourceTable[T](
}
val proctime = tableSource match {
- case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null =>
+ case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
val proctimeAttribute = timeSource.getProctimeAttribute
Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
case _ =>
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
deleted file mode 100644
index ff3821a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime
-
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.functions.util.FunctionUtils
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.codegen.Compiler
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-import org.slf4j.{Logger, LoggerFactory}
-
-/**
- * A CorrelateFlatMapRunner with [[CRow]] input and [[CRow]] output.
- */
-class CRowCorrelateFlatMapRunner(
- flatMapName: String,
- flatMapCode: String,
- collectorName: String,
- collectorCode: String,
- @transient var returnType: TypeInformation[CRow])
- extends RichFlatMapFunction[CRow, CRow]
- with ResultTypeQueryable[CRow]
- with Compiler[Any] {
-
- val LOG: Logger = LoggerFactory.getLogger(this.getClass)
-
- private var function: FlatMapFunction[Row, Row] = _
- private var collector: TableFunctionCollector[_] = _
- private var cRowWrapper: CRowWrappingCollector = _
-
- override def open(parameters: Configuration): Unit = {
- LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode")
- val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
- LOG.debug("Instantiating TableFunctionCollector.")
- collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
- this.cRowWrapper = new CRowWrappingCollector()
-
- LOG.debug(s"Compiling FlatMapFunction: $flatMapName \n\n Code:\n$flatMapCode")
- val flatMapClazz = compile(getRuntimeContext.getUserCodeClassLoader, flatMapName, flatMapCode)
- val constructor = flatMapClazz.getConstructor(classOf[TableFunctionCollector[_]])
- LOG.debug("Instantiating FlatMapFunction.")
- function = constructor.newInstance(collector).asInstanceOf[FlatMapFunction[Row, Row]]
- FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
- FunctionUtils.openFunction(function, parameters)
- }
-
- override def flatMap(in: CRow, out: Collector[CRow]): Unit = {
- cRowWrapper.out = out
- cRowWrapper.setChange(in.change)
-
- collector.setCollector(cRowWrapper)
- collector.setInput(in.row)
- collector.reset()
-
- function.flatMap(in.row, cRowWrapper)
- }
-
- override def getProducedType: TypeInformation[CRow] = returnType
-
- override def close(): Unit = {
- FunctionUtils.closeFunction(function)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
new file mode 100644
index 0000000..4f0a785
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * A CorrelateProcessRunner with [[CRow]] input and [[CRow]] output.
+ */
+class CRowCorrelateProcessRunner(
+ processName: String,
+ processCode: String,
+ collectorName: String,
+ collectorCode: String,
+ @transient var returnType: TypeInformation[CRow])
+ extends ProcessFunction[CRow, CRow]
+ with ResultTypeQueryable[CRow]
+ with Compiler[Any] {
+
+ val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
+ private var function: ProcessFunction[Row, Row] = _
+ private var collector: TableFunctionCollector[_] = _
+ private var cRowWrapper: CRowWrappingCollector = _
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
+ LOG.debug("Instantiating TableFunctionCollector.")
+ collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
+ this.cRowWrapper = new CRowWrappingCollector()
+
+ LOG.debug(s"Compiling ProcessFunction: $processName \n\n Code:\n$processCode")
+ val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode)
+ val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]])
+ LOG.debug("Instantiating ProcessFunction.")
+ function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]]
+ FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+ FunctionUtils.openFunction(function, parameters)
+ }
+
+ override def processElement(
+ in: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow])
+ : Unit = {
+
+ cRowWrapper.out = out
+ cRowWrapper.setChange(in.change)
+
+ collector.setCollector(cRowWrapper)
+ collector.setInput(in.row)
+ collector.reset()
+
+ function.processElement(
+ in.row,
+ ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
+ cRowWrapper)
+ }
+
+ override def getProducedType: TypeInformation[CRow] = returnType
+
+ override def close(): Unit = {
+ FunctionUtils.closeFunction(function)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
deleted file mode 100644
index 9701cb9..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime
-
-import org.apache.flink.api.common.functions.util.FunctionUtils
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.codegen.Compiler
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
-
-/**
- * FlatMapRunner with [[CRow]] input and [[CRow]] output.
- */
-class CRowFlatMapRunner(
- name: String,
- code: String,
- @transient var returnType: TypeInformation[CRow])
- extends RichFlatMapFunction[CRow, CRow]
- with ResultTypeQueryable[CRow]
- with Compiler[FlatMapFunction[Row, Row]] {
-
- val LOG = LoggerFactory.getLogger(this.getClass)
-
- private var function: FlatMapFunction[Row, Row] = _
- private var cRowWrapper: CRowWrappingCollector = _
-
- override def open(parameters: Configuration): Unit = {
- LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
- val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
- LOG.debug("Instantiating FlatMapFunction.")
- function = clazz.newInstance()
- FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
- FunctionUtils.openFunction(function, parameters)
-
- this.cRowWrapper = new CRowWrappingCollector()
- }
-
- override def flatMap(in: CRow, out: Collector[CRow]): Unit = {
- cRowWrapper.out = out
- cRowWrapper.setChange(in.change)
- function.flatMap(in.row, cRowWrapper)
- }
-
- override def getProducedType: TypeInformation[CRow] = returnType
-
- override def close(): Unit = {
- FunctionUtils.closeFunction(function)
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala
new file mode 100644
index 0000000..cef62a5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+ * ProcessRunner with [[CRow]] input and [[CRow]] output.
+ */
+class CRowProcessRunner(
+ name: String,
+ code: String,
+ @transient var returnType: TypeInformation[CRow])
+ extends ProcessFunction[CRow, CRow]
+ with ResultTypeQueryable[CRow]
+ with Compiler[ProcessFunction[Row, Row]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var function: ProcessFunction[Row, Row] = _
+ private var cRowWrapper: CRowWrappingCollector = _
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating ProcessFunction.")
+ function = clazz.newInstance()
+ FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+ FunctionUtils.openFunction(function, parameters)
+
+ this.cRowWrapper = new CRowWrappingCollector()
+ }
+
+ override def processElement(
+ in: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow])
+ : Unit = {
+
+ cRowWrapper.out = out
+ cRowWrapper.setChange(in.change)
+ function.processElement(
+ in.row,
+ ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
+ cRowWrapper)
+ }
+
+ override def getProducedType: TypeInformation[CRow] = returnType
+
+ override def close(): Unit = {
+ FunctionUtils.closeFunction(function)
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
deleted file mode 100644
index 6d87663..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-/**
- * Defines a logical event-time attribute for a [[TableSource]].
- * The event-time attribute can be used for indicating, accessing, and working with Flink's
- * event-time.
- *
- * A [[TableSource]] that implements this interface defines the name of
- * the event-time attribute. The attribute will be added to the schema of the
- * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
- */
-trait DefinedRowTimeAttribute {
-
- /**
- * Defines a name of the event-time attribute that represents Flink's
- * event-time. Null if no rowtime should be available.
- *
- * The field will be appended to the schema provided by the [[TableSource]].
- */
- def getRowtimeAttribute: String
-}
-
-/**
- * Defines a logical processing-time attribute for a [[TableSource]].
- * The processing-time attribute can be used for indicating, accessing, and working with Flink's
- * processing-time.
- *
- * A [[TableSource]] that implements this interface defines the name of
- * the processing-time attribute. The attribute will be added to the schema of the
- * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
- */
-trait DefinedProcTimeAttribute {
-
- /**
- * Defines a name of the processing-time attribute that represents Flink's
- * processing-time. Null if no rowtime should be available.
- *
- * The field will be appended to the schema provided by the [[TableSource]].
- */
- def getProctimeAttribute: String
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
new file mode 100644
index 0000000..d381115
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+/**
+ * Defines a logical event-time attribute for a [[TableSource]].
+ * The event-time attribute can be used for indicating, accessing, and working with Flink's
+ * event-time.
+ *
+ * A [[TableSource]] that implements this interface defines the name of
+ * the event-time attribute. The attribute will be added to the schema of the
+ * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
+ */
+trait DefinedRowtimeAttribute {
+
+ /**
+ * Defines a name of the event-time attribute that represents Flink's
+ * event-time. Null if no rowtime should be available.
+ *
+ * The field will be appended to the schema provided by the [[TableSource]].
+ */
+ def getRowtimeAttribute: String
+}
+
+/**
+ * Defines a logical processing-time attribute for a [[TableSource]].
+ * The processing-time attribute can be used for indicating, accessing, and working with Flink's
+ * processing-time.
+ *
+ * A [[TableSource]] that implements this interface defines the name of
+ * the processing-time attribute. The attribute will be added to the schema of the
+ * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
+ */
+trait DefinedProctimeAttribute {
+
+ /**
+ * Defines a name of the processing-time attribute that represents Flink's
+ * processing-time. Null if no rowtime should be available.
+ *
+ * The field will be appended to the schema provided by the [[TableSource]].
+ */
+ def getProctimeAttribute: String
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
index e9384c7..7797f22 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
@@ -19,18 +19,16 @@
package org.apache.flink.table.api.scala.stream
import java.lang.{Integer => JInt, Long => JLong}
-import java.util.Collections
-import java.util.{List => JList}
import org.apache.flink.api.java.tuple.{Tuple5 => JTuple5}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecEnv}
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv}
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
import org.apache.flink.table.utils.TableTestBase
import org.junit.Test
import org.mockito.Mockito.{mock, when}
@@ -151,7 +149,9 @@ class StreamTableEnvironmentTest extends TableTestBase {
private def prepareSchemaExpressionParser:
(JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
- val jTEnv = TableEnvironment.getTableEnvironment(mock(classOf[JStreamExecEnv]))
+ val jStreamExecEnv = mock(classOf[JStreamExecEnv])
+ when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
+ val jTEnv = TableEnvironment.getTableEnvironment(jStreamExecEnv)
val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
.asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
index 18066c9..cda90f7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, StreamTableSource}
+import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource}
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.table.utils.TableTestUtil.{term, unaryNode}
import org.apache.flink.types.Row
@@ -35,7 +35,7 @@ class TableSourceTest extends TableTestBase {
@Test
def testRowTimeTableSourceSimple(): Unit = {
val util = streamTestUtil()
- util.tEnv.registerTableSource("rowTimeT", new TestRowTimeSource("addTime"))
+ util.tEnv.registerTableSource("rowTimeT", new TestRowtimeSource("addTime"))
val t = util.tEnv.scan("rowTimeT").select("addTime, id, name, val")
@@ -43,7 +43,7 @@ class TableSourceTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
"StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])",
- term("select", "addTime", "id", "name", "val")
+ term("select", "TIME_MATERIALIZATION(addTime) AS addTime", "id", "name", "val")
)
util.verifyTable(t, expected)
}
@@ -51,7 +51,7 @@ class TableSourceTest extends TableTestBase {
@Test
def testRowTimeTableSourceGroupWindow(): Unit = {
val util = streamTestUtil()
- util.tEnv.registerTableSource("rowTimeT", new TestRowTimeSource("addTime"))
+ util.tEnv.registerTableSource("rowTimeT", new TestRowtimeSource("addTime"))
val t = util.tEnv.scan("rowTimeT")
.filter("val > 100")
@@ -82,7 +82,7 @@ class TableSourceTest extends TableTestBase {
@Test
def testProcTimeTableSourceSimple(): Unit = {
val util = streamTestUtil()
- util.tEnv.registerTableSource("procTimeT", new TestProcTimeSource("pTime"))
+ util.tEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime"))
val t = util.tEnv.scan("procTimeT").select("pTime, id, name, val")
@@ -90,7 +90,7 @@ class TableSourceTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
"StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
- term("select", "pTime", "id", "name", "val")
+ term("select", "TIME_MATERIALIZATION(pTime) AS pTime", "id", "name", "val")
)
util.verifyTable(t, expected)
}
@@ -98,7 +98,7 @@ class TableSourceTest extends TableTestBase {
@Test
def testProcTimeTableSourceOverWindow(): Unit = {
val util = streamTestUtil()
- util.tEnv.registerTableSource("procTimeT", new TestProcTimeSource("pTime"))
+ util.tEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime"))
val t = util.tEnv.scan("procTimeT")
.window(Over partitionBy 'id orderBy 'pTime preceding 2.hours as 'w)
@@ -123,8 +123,8 @@ class TableSourceTest extends TableTestBase {
}
}
-class TestRowTimeSource(timeField: String)
- extends StreamTableSource[Row] with DefinedRowTimeAttribute {
+class TestRowtimeSource(timeField: String)
+ extends StreamTableSource[Row] with DefinedRowtimeAttribute {
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
@@ -137,8 +137,8 @@ class TestRowTimeSource(timeField: String)
}
}
-class TestProcTimeSource(timeField: String)
- extends StreamTableSource[Row] with DefinedProcTimeAttribute {
+class TestProctimeSource(timeField: String)
+ extends StreamTableSource[Row] with DefinedProctimeAttribute {
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???