You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/14 01:21:51 UTC
[flink] branch master updated: [FLINK-12374][table-planner-blink]
Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan
to StreamTransformation. (#8407)
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2826ff8 [FLINK-12374][table-planner-blink] Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. (#8407)
2826ff8 is described below
commit 2826ff80c4b056d7af589649238b3acabca43837
Author: Jing Zhang <be...@126.com>
AuthorDate: Tue May 14 09:21:39 2019 +0800
[FLINK-12374][table-planner-blink] Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. (#8407)
---
.../PlannerResolvedFieldReference.scala | 26 ++
.../logical/FlinkLogicalTableSourceScan.scala | 15 +-
.../physical/batch/BatchExecTableSourceScan.scala | 62 +++-
.../stream/StreamExecTableSourceScan.scala | 148 +++++++++-
.../apache/flink/table/plan/util/ScanUtil.scala | 2 +-
.../flink/table/sources/TableSourceUtil.scala | 321 ++++++++++++++++++++-
.../table/sources/definedTimeAttributes.scala | 95 ++++++
.../table/sources/tsextractors/ExistingField.scala | 108 +++++++
.../sources/wmstrategies/AscendingTimestamps.scala | 49 ++++
.../sources/wmstrategies/watermarkStrategies.scala | 81 ++++++
.../table/runtime/batch/sql/TableScanITCase.scala | 119 ++++++++
.../table/runtime/stream/sql/TableScanITCase.scala | 144 +++++++++
.../apache/flink/table/util/testTableSources.scala | 106 +++++++
13 files changed, 1254 insertions(+), 22 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala
new file mode 100644
index 0000000..a3406f8
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class PlannerResolvedFieldReference(
+ name: String,
+ resultType: TypeInformation[_],
+ fieldIndex: Int) extends ResolvedFieldReference
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index cb94b3a..7b2b7a3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -18,7 +18,6 @@
package org.apache.flink.table.plan.nodes.logical
-import org.apache.flink.table.api.TableException
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan.isTableSourceScan
@@ -49,7 +48,9 @@ class FlinkLogicalTableSourceScan(
extends TableScan(cluster, traitSet, relOptTable)
with FlinkLogicalRel {
- val tableSource: TableSource[_] = relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+ lazy val tableSource: TableSource[_] = tableSourceTable.tableSource
+
+ private lazy val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
def copy(
traitSet: RelTraitSet,
@@ -63,15 +64,7 @@ class FlinkLogicalTableSourceScan(
override def deriveRowType(): RelDataType = {
val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-
- tableSource match {
- case s: StreamTableSource[_] =>
- TableSourceUtil.getRelDataType(s, None, streaming = true, flinkTypeFactory)
- case _: BatchTableSource[_] =>
- flinkTypeFactory.buildLogicalRowType(
- tableSource.getTableSchema, isStreaming = Option.apply(false))
- case _ => throw new TableException("Unknown TableSource type.")
- }
+ tableSourceTable.getRowType(flinkTypeFactory)
}
override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index 93acefa..ee39511 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -18,18 +18,23 @@
package org.apache.flink.table.plan.nodes.physical.batch
+import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.transformations.StreamTransformation
-import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.table.api.{BatchTableEnvironment, TableException, Types}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.FlinkRelOptTable
-import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.table.sources.{BatchTableSource, TableSourceUtil}
+import org.apache.flink.table.`type`.TypeConverters.createInternalTypeFromTypeInfo
+import org.apache.flink.table.codegen.CodeGeneratorContext
+import org.apache.flink.table.plan.util.ScanUtil
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
import java.util
@@ -75,7 +80,58 @@ class BatchExecTableSourceScan(
override def translateToPlanInternal(
tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
- throw new TableException("Implements this")
+ val config = tableEnv.getConfig
+ val bts = tableSource.asInstanceOf[BatchTableSource[_]]
+ val inputTransform = bts.getBoundedStream(tableEnv.streamEnv).getTransformation
+
+ val fieldIndexes = TableSourceUtil.computeIndexMapping(
+ tableSource,
+ isStreamTable = false,
+ None)
+
+ // check that declared and actual type of table source DataStream are identical
+ if (createInternalTypeFromTypeInfo(inputTransform.getOutputType) !=
+ createInternalTypeFromTypeInfo(tableSource.getReturnType)) {
+ throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " +
+ s"returned a DataSet of type ${inputTransform.getOutputType} that does not match with " +
+ s"the type ${tableSource.getReturnType} declared by the TableSource.getReturnType() " +
+ s"method. Please validate the implementation of the TableSource.")
+ }
+
+ // get expression to extract rowtime attribute
+ val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression(
+ tableSource,
+ None,
+ cluster,
+ tableEnv.getRelBuilder,
+ Types.SQL_TIMESTAMP
+ )
+ if (needInternalConversion) {
+ ScanUtil.convertToInternalRow(
+ CodeGeneratorContext(config),
+ inputTransform.asInstanceOf[StreamTransformation[Any]],
+ fieldIndexes,
+ tableSource.getReturnType,
+ getRowType,
+ getTable.getQualifiedName,
+ config,
+ rowtimeExpression)
+ } else {
+ inputTransform.asInstanceOf[StreamTransformation[BaseRow]]
+ }
+
}
+ def needInternalConversion: Boolean = {
+ val fieldIndexes = TableSourceUtil.computeIndexMapping(
+ tableSource,
+ isStreamTable = false,
+ None)
+ ScanUtil.hasTimeAttributeField(fieldIndexes) ||
+ ScanUtil.needsConversion(
+ tableSource.getReturnType,
+ TypeExtractor.createTypeInfo(
+ tableSource, classOf[BatchTableSource[_]], tableSource.getClass, 0)
+ .getTypeClass.asInstanceOf[Class[_]])
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index 4b2a8aa..096604b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -18,17 +18,28 @@
package org.apache.flink.table.plan.nodes.physical.stream
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
import org.apache.flink.streaming.api.transformations.StreamTransformation
-import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException, Types}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.FlinkRelOptTable
-import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PreserveWatermarks, PunctuatedWatermarkAssigner}
+import org.apache.flink.table.sources.{RowtimeAttributeDescriptor, StreamTableSource, TableSourceUtil}
+import org.apache.flink.table.`type`.TypeConverters.createInternalTypeFromTypeInfo
+import org.apache.flink.table.codegen.CodeGeneratorContext
+import org.apache.flink.table.codegen.OperatorCodeGenerator._
+import org.apache.flink.table.plan.util.ScanUtil
+import org.apache.flink.table.runtime.AbstractProcessStreamOperator
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
import java.util
@@ -79,6 +90,137 @@ class StreamExecTableSourceScan(
override protected def translateToPlanInternal(
tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
- throw new TableException("Implements this")
+ val config = tableEnv.getConfig
+ val sts = tableSource.asInstanceOf[StreamTableSource[_]]
+ val inputTransform = sts.getDataStream(tableEnv.execEnv).getTransformation
+
+ val fieldIndexes = TableSourceUtil.computeIndexMapping(
+ tableSource,
+ isStreamTable = true,
+ None)
+
+ // check that declared and actual type of table source DataStream are identical
+ if (createInternalTypeFromTypeInfo(inputTransform.getOutputType) !=
+ createInternalTypeFromTypeInfo(tableSource.getReturnType)) {
+ throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " +
+ s"returned a DataStream of type ${inputTransform.getOutputType} that does not match with " +
+ s"the type ${tableSource.getReturnType} declared by the TableSource.getReturnType() " +
+ s"method. Please validate the implementation of the TableSource.")
+ }
+
+ // get expression to extract rowtime attribute
+ val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression(
+ tableSource,
+ None,
+ cluster,
+ tableEnv.getRelBuilder,
+ Types.SQL_TIMESTAMP
+ )
+
+ val streamTransformation = if (needInternalConversion) {
+ // extract time if the index is -1 or -2.
+ val (extractElement, resetElement) =
+ if (ScanUtil.hasTimeAttributeField(fieldIndexes)) {
+ (s"ctx.$ELEMENT = $ELEMENT;", s"ctx.$ELEMENT = null;")
+ } else {
+ ("", "")
+ }
+ val ctx = CodeGeneratorContext(config).setOperatorBaseClass(
+ classOf[AbstractProcessStreamOperator[BaseRow]])
+ ScanUtil.convertToInternalRow(
+ ctx,
+ inputTransform.asInstanceOf[StreamTransformation[Any]],
+ fieldIndexes,
+ tableSource.getReturnType,
+ getRowType,
+ getTable.getQualifiedName,
+ config,
+ rowtimeExpression,
+ beforeConvert = extractElement,
+ afterConvert = resetElement)
+ } else {
+ inputTransform.asInstanceOf[StreamTransformation[BaseRow]]
+ }
+
+ val ingestedTable = new DataStream(tableEnv.execEnv, streamTransformation)
+
+ // generate watermarks for rowtime indicator
+ val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
+ TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, None)
+
+ val withWatermarks = if (rowtimeDesc.isDefined) {
+ val rowtimeFieldIdx = getRowType.getFieldNames.indexOf(rowtimeDesc.get.getAttributeName)
+ val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
+ watermarkStrategy match {
+ case p: PeriodicWatermarkAssigner =>
+ val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+ ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+ case p: PunctuatedWatermarkAssigner =>
+ val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+ ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+ case _: PreserveWatermarks =>
+ // The watermarks have already been provided by the underlying DataStream.
+ ingestedTable
+ }
+ } else {
+ // No need to generate watermarks if no rowtime attribute is specified.
+ ingestedTable
+ }
+
+ withWatermarks.getTransformation
+ }
+
+ def needInternalConversion: Boolean = {
+ val fieldIndexes = TableSourceUtil.computeIndexMapping(
+ tableSource,
+ isStreamTable = true,
+ None)
+ ScanUtil.hasTimeAttributeField(fieldIndexes) ||
+ ScanUtil.needsConversion(
+ tableSource.getReturnType,
+ TypeExtractor.createTypeInfo(
+ tableSource, classOf[StreamTableSource[_]], tableSource.getClass, 0)
+ .getTypeClass.asInstanceOf[Class[_]])
+ }
+}
+
+/**
+ * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
+ *
+ * @param timeFieldIdx the index of the rowtime attribute.
+ * @param assigner the watermark assigner.
+ */
+private class PeriodicWatermarkAssignerWrapper(
+ timeFieldIdx: Int,
+ assigner: PeriodicWatermarkAssigner)
+ extends AssignerWithPeriodicWatermarks[BaseRow] {
+
+ override def getCurrentWatermark: Watermark = assigner.getWatermark
+
+ override def extractTimestamp(row: BaseRow, previousElementTimestamp: Long): Long = {
+ val timestamp: Long = row.getLong(timeFieldIdx)
+ assigner.nextTimestamp(timestamp)
+ 0L
+ }
+}
+
+/**
+ * Generates periodic watermarks based on a [[PunctuatedWatermarkAssigner]].
+ *
+ * @param timeFieldIdx the index of the rowtime attribute.
+ * @param assigner the watermark assigner.
+ */
+private class PunctuatedWatermarkAssignerWrapper(
+ timeFieldIdx: Int,
+ assigner: PunctuatedWatermarkAssigner)
+ extends AssignerWithPunctuatedWatermarks[BaseRow] {
+
+ override def checkAndGetNextWatermark(row: BaseRow, ts: Long): Watermark = {
+ val timestamp: Long = row.getLong(timeFieldIdx)
+ assigner.getWatermark(row, timestamp)
+ }
+
+ override def extractTimestamp(element: BaseRow, previousElementTimestamp: Long): Long = {
+ 0L
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ScanUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ScanUtil.scala
index f38604b..a1c85d3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ScanUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ScanUtil.scala
@@ -77,7 +77,7 @@ object ScanUtil {
val convertFunc = CodeGenUtils.genToInternal(ctx, inputType)
internalInType match {
case rt: RowType => (convertFunc, rt)
- case _ => ((record: String) => s"$GENERIC_ROW.wrap(${convertFunc(record)})",
+ case _ => ((record: String) => s"$GENERIC_ROW.of(${convertFunc(record)})",
new RowType(internalInType))
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
index 985cc37..f6b22f6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -18,15 +18,114 @@
package org.apache.flink.table.sources
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.table.`type`.TypeConverters
+import org.apache.flink.table.api.{Types, ValidationException}
import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.`type`.InternalType
+import org.apache.flink.table.`type`.InternalTypes.{BYTE, PROCTIME_BATCH_MARKER, PROCTIME_INDICATOR, PROCTIME_STREAM_MARKER, ROWTIME_BATCH_MARKER, ROWTIME_INDICATOR, ROWTIME_STREAM_MARKER}
+import org.apache.flink.table.expressions.{BuiltInFunctionDefinitions, CallExpression, PlannerResolvedFieldReference, ResolvedFieldReference, RexNodeConverter, TypeLiteralExpression}
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.tools.RelBuilder
+
+import scala.collection.JavaConversions._
/** Util class for [[TableSource]]. */
object TableSourceUtil {
/**
+ * Computes the indices that map the input type of the DataStream to the schema of the table.
+ *
+ * The mapping is based on the field names and fails if a table field cannot be
+ * mapped to a field of the input type.
+ *
+ * @param tableSource The table source for which the table schema is mapped to the input type.
+ * @param isStreamTable True if the mapping is computed for a streaming table, false otherwise.
+ * @param selectedFields The indexes of the table schema fields for which a mapping is
+ * computed. If None, a mapping for all fields is computed.
+ * @return An index mapping from input type to table schema.
+ */
+ def computeIndexMapping(
+ tableSource: TableSource[_],
+ isStreamTable: Boolean,
+ selectedFields: Option[Array[Int]]): Array[Int] = {
+
+ val tableSchema = tableSource.getTableSchema
+
+ // get names of selected fields
+ val tableFieldNames = if (selectedFields.isDefined) {
+ val names = tableSchema.getFieldNames
+ selectedFields.get.map(names(_))
+ } else {
+ tableSchema.getFieldNames
+ }
+
+ // get types of selected fields
+ val tableFieldTypes = if (selectedFields.isDefined) {
+ val types = tableSchema.getFieldTypes
+ selectedFields.get.map(types(_))
+ } else {
+ tableSchema.getFieldTypes
+ }
+
+ // get rowtime and proctime attributes
+ val rowtimeAttributes = getRowtimeAttributes(tableSource)
+ val proctimeAttributes = getProctimeAttribute(tableSource)
+
+ // compute mapping of selected fields and time attributes
+ val mapping: Array[Int] = tableFieldTypes.zip(tableFieldNames).map {
+ case (Types.SQL_TIMESTAMP, name: String)
+ if proctimeAttributes.contains(name) =>
+ if (isStreamTable) {
+ PROCTIME_STREAM_MARKER
+ } else {
+ PROCTIME_BATCH_MARKER
+ }
+ case (Types.SQL_TIMESTAMP, name: String)
+ if rowtimeAttributes.contains(name) =>
+ if (isStreamTable) {
+ ROWTIME_STREAM_MARKER
+ } else {
+ ROWTIME_BATCH_MARKER
+ }
+ case (t: TypeInformation[_], name) =>
+ // check if field is registered as time indicator
+ if (proctimeAttributes.contains(name)) {
+ throw new ValidationException(s"Processing time field '$name' has invalid type $t. " +
+ s"Processing time attributes must be of type ${Types.SQL_TIMESTAMP}.")
+ }
+ if (rowtimeAttributes.contains(name)) {
+ throw new ValidationException(s"Rowtime field '$name' has invalid type $t. " +
+ s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.")
+ }
+
+ val (physicalName, idx, tpe) = resolveInputField(name, tableSource)
+ // validate that mapped fields are are same type
+ if (tpe != t) {
+ throw new ValidationException(s"Type $t of table field '$name' does not " +
+ s"match with type $tpe of the field '$physicalName' of the TableSource return type.")
+ }
+ idx
+ }
+ val inputType = tableSource.getReturnType
+
+ // ensure that only one field is mapped to an atomic type
+ if (!inputType.isInstanceOf[CompositeType[_]] && mapping.count(_ >= 0) > 1) {
+ throw new ValidationException(
+ s"More than one table field matched to atomic input type $inputType.")
+ }
+
+ mapping
+ }
+
+ /**
* Returns the Calcite schema of a [[TableSource]].
*
* @param tableSource The [[TableSource]] for which the Calcite schema is generated.
@@ -42,13 +141,29 @@ object TableSourceUtil {
typeFactory: FlinkTypeFactory): RelDataType = {
val fieldNames = tableSource.getTableSchema.getFieldNames
- val fieldTypes = tableSource.getTableSchema.getFieldTypes
+ var fieldTypes = tableSource.getTableSchema.getFieldTypes
.map(TypeConverters.createInternalTypeFromTypeInfo)
// TODO get fieldNullables from TableSchema
- val fieldNullables = fieldTypes.map(_ => true)
+ var fieldNullables = fieldTypes.map(_ => true)
- // TODO supports time attributes after Expression is ready
+ if (streaming) {
+ // adjust the type of time attributes for streaming tables
+ val rowtimeAttributes = getRowtimeAttributes(tableSource)
+ val proctimeAttributes = getProctimeAttribute(tableSource)
+ // patch rowtime fields with time indicator type
+ rowtimeAttributes.foreach { rowtimeField =>
+ val idx = fieldNames.indexOf(rowtimeField)
+ fieldTypes = fieldTypes.patch(idx, Seq(ROWTIME_INDICATOR), 1)
+ fieldNullables = fieldNullables.patch(idx, Seq(false), 1)
+ }
+ // patch proctime field with time indicator type
+ proctimeAttributes.foreach { proctimeField =>
+ val idx = fieldNames.indexOf(proctimeField)
+ fieldTypes = fieldTypes.patch(idx, Seq(PROCTIME_INDICATOR), 1)
+ fieldNullables = fieldNullables.patch(idx, Seq(false), 1)
+ }
+ }
val (selectedFieldNames, selectedFieldTypes, selectedFieldNullables) =
if (selectedFields.isDefined) {
// filter field names and types by selected fields
@@ -57,9 +172,207 @@ object TableSourceUtil {
selectedFields.get.map(fieldTypes(_)),
selectedFields.get.map(fieldNullables(_)))
} else {
- (fieldNames, fieldTypes, fieldNullables)
+ (fieldNames, fieldTypes, fieldNullables)
}
typeFactory.buildRelDataType(selectedFieldNames, selectedFieldTypes, selectedFieldNullables)
}
+ /**
+ * Returns the [[RowtimeAttributeDescriptor]] of a [[TableSource]].
+ *
+ * @param tableSource The [[TableSource]] for which the [[RowtimeAttributeDescriptor]] is
+ * returned.
+ * @param selectedFields The fields which are selected from the [[TableSource]].
+ * If None, all fields are selected.
+ * @return The [[RowtimeAttributeDescriptor]] of the [[TableSource]].
+ */
+ def getRowtimeAttributeDescriptor(
+ tableSource: TableSource[_],
+ selectedFields: Option[Array[Int]]): Option[RowtimeAttributeDescriptor] = {
+
+ tableSource match {
+ case r: DefinedRowtimeAttributes =>
+ val descriptors = r.getRowtimeAttributeDescriptors
+ if (descriptors.size() == 0) {
+ None
+ } else if (descriptors.size > 1) {
+ throw new ValidationException("Table with has more than a single rowtime attribute..")
+ } else {
+ // exactly one rowtime attribute descriptor
+ if (selectedFields.isEmpty) {
+ // all fields are selected.
+ Some(descriptors.get(0))
+ } else {
+ val descriptor = descriptors.get(0)
+ // look up index of row time attribute in schema
+ val fieldIdx = tableSource.getTableSchema.getFieldNames.indexOf(
+ descriptor.getAttributeName)
+ // is field among selected fields?
+ if (selectedFields.get.contains(fieldIdx)) {
+ Some(descriptor)
+ } else {
+ None
+ }
+ }
+ }
+ case _ => None
+ }
+ }
+
+ /**
+ * Obtains the [[RexNode]] expression to extract the rowtime timestamp for a [[TableSource]].
+ *
+ * @param tableSource The [[TableSource]] for which the expression is extracted.
+ * @param selectedFields The selected fields of the [[TableSource]].
+ * If None, all fields are selected.
+ * @param cluster The [[RelOptCluster]] of the current optimization process.
+ * @param relBuilder The [[RelBuilder]] to build the [[RexNode]].
+ * @param resultType The result type of the timestamp expression.
+ * @return The [[RexNode]] expression to extract the timestamp of the table source.
+ */
+ def getRowtimeExtractionExpression(
+ tableSource: TableSource[_],
+ selectedFields: Option[Array[Int]],
+ cluster: RelOptCluster,
+ relBuilder: RelBuilder,
+ resultType: TypeInformation[_]): Option[RexNode] = {
+
+ val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+ /**
+ * Creates a RelNode with a schema that corresponds on the given fields
+ * Fields for which no information is available, will have default values.
+ */
+ def createSchemaRelNode(fields: Array[(String, Int, TypeInformation[_])]): RelNode = {
+ val maxIdx = fields.map(_._2).max
+ val idxMap: Map[Int, (String, InternalType)] = Map(
+ fields.map(f => f._2 ->(f._1, TypeConverters.createInternalTypeFromTypeInfo(f._3))): _*)
+ val (physicalFields, physicalTypes) = (0 to maxIdx)
+ .map(i => idxMap.getOrElse(i, ("", BYTE))).unzip
+ val physicalSchema: RelDataType = typeFactory.buildRelDataType(
+ physicalFields,
+ physicalTypes)
+ LogicalValues.create(
+ cluster,
+ physicalSchema,
+ ImmutableList.of().asInstanceOf[ImmutableList[ImmutableList[RexLiteral]]])
+ }
+
+ val rowtimeDesc = getRowtimeAttributeDescriptor(tableSource, selectedFields)
+ rowtimeDesc.map { r =>
+ val tsExtractor = r.getTimestampExtractor
+
+ val fieldAccesses: Array[ResolvedFieldReference] =
+ if (tsExtractor.getArgumentFields.nonEmpty) {
+ val resolvedFields = resolveInputFields(tsExtractor.getArgumentFields, tableSource)
+ // push an empty values node with the physical schema on the relbuilder
+ relBuilder.push(createSchemaRelNode(resolvedFields))
+ // get extraction expression
+ resolvedFields.map(f => PlannerResolvedFieldReference(f._1, f._3, f._2))
+ } else {
+ new Array[ResolvedFieldReference](0)
+ }
+
+ val expression = tsExtractor.getExpression(fieldAccesses)
+ // add cast to requested type and convert expression to RexNode
+ val castExpression = new CallExpression(
+ BuiltInFunctionDefinitions.CAST,
+ List(expression, new TypeLiteralExpression(resultType)))
+ val rexExpression = castExpression.accept(new RexNodeConverter(relBuilder))
+ relBuilder.clear()
+ rexExpression
+ }
+ }
+
+ /** Returns a list with all rowtime attribute names of the [[TableSource]]. */
+ private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = {
+ tableSource match {
+ case r: DefinedRowtimeAttributes =>
+ r.getRowtimeAttributeDescriptors.map(_.getAttributeName).toArray
+ case _ =>
+ Array()
+ }
+ }
+
+ /** Returns the proctime attribute of the [[TableSource]] if it is defined. */
+ private def getProctimeAttribute(tableSource: TableSource[_]): Option[String] = {
+ tableSource match {
+ case p: DefinedProctimeAttribute if p.getProctimeAttribute != null =>
+ Some(p.getProctimeAttribute)
+ case _ =>
+ None
+ }
+ }
+
+ /**
+ * Identifies for a field name of the logical schema, the corresponding physical field in the
+ * return type of a [[TableSource]].
+ *
+ * @param fieldName The logical field to look up.
+ * @param tableSource The table source in which to look for the field.
+ * @return The name, index, and type information of the physical field.
+ */
+ private def resolveInputField(
+ fieldName: String,
+ tableSource: TableSource[_]): (String, Int, TypeInformation[_]) = {
+
+ val returnType = tableSource.getReturnType
+
+ /** Look up a field by name in a type information */
+ def lookupField(fieldName: String, failMsg: String): (String, Int, TypeInformation[_]) = {
+ returnType match {
+
+ case c: CompositeType[_] =>
+ // get and check field index
+ val idx = c.getFieldIndex(fieldName)
+ if (idx < 0) {
+ throw new ValidationException(failMsg)
+ }
+ // return field name, index, and field type
+ (fieldName, idx, c.getTypeAt(idx))
+
+ case t: TypeInformation[_] =>
+ // no composite type, we return the full atomic type as field
+ (fieldName, 0, t)
+ }
+ }
+
+ tableSource match {
+ case d: DefinedFieldMapping if d.getFieldMapping != null =>
+ // resolve field name in field mapping
+ val resolvedFieldName = d.getFieldMapping.get(fieldName)
+ if (resolvedFieldName == null) {
+ throw new ValidationException(
+ s"Field '$fieldName' could not be resolved by the field mapping.")
+ }
+ // look up resolved field in return type
+ lookupField(
+ resolvedFieldName,
+ s"Table field '$fieldName' was resolved to TableSource return type field " +
+ s"'$resolvedFieldName', but field '$resolvedFieldName' was not found in the return " +
+ s"type $returnType of the TableSource. " +
+ s"Please verify the field mapping of the TableSource.")
+ case _ =>
+ // look up field in return type
+ lookupField(
+ fieldName,
+ s"Table field '$fieldName' was not found in the return type $returnType of the " +
+ s"TableSource.")
+ }
+ }
+
+ /**
+ * Identifies the physical fields in the return type
+ * [[org.apache.flink.api.common.typeinfo.TypeInformation]] of a [[TableSource]]
+ * for a list of field names of the [[TableSource]]'s [[org.apache.flink.table.api.TableSchema]].
+ *
+ * @param fieldNames The field names to look up.
+ * @param tableSource The table source in which to look for the field.
+ * @return The name, index, and type information of the physical field.
+ */
+ private def resolveInputFields(
+ fieldNames: Array[String],
+ tableSource: TableSource[_]): Array[(String, Int, TypeInformation[_])] = {
+ fieldNames.map(resolveInputField(_, tableSource))
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
new file mode 100644
index 0000000..b144312
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+import java.util.Objects
+import javax.annotation.Nullable
+
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor
+import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy
+
+/**
+ * Extends a [[TableSource]] to specify a processing time attribute.
+ */
+trait DefinedProctimeAttribute {
+
+ /**
+ * Returns the name of a processing time attribute or null if no processing time attribute is
+ * present.
+ *
+ * The referenced attribute must be present in the [[TableSchema]] of the [[TableSource]] and of
+ * type [[Types.SQL_TIMESTAMP]].
+ */
+ @Nullable
+ def getProctimeAttribute: String
+}
+
+/**
+ * Extends a [[TableSource]] to specify rowtime attributes via a
+ * [[RowtimeAttributeDescriptor]].
+ */
+trait DefinedRowtimeAttributes {
+
+ /**
+ * Returns a list of [[RowtimeAttributeDescriptor]] for all rowtime attributes of the table.
+ *
+ * All referenced attributes must be present in the [[TableSchema]] of the [[TableSource]] and of
+ * type [[Types.SQL_TIMESTAMP]].
+ *
+ * @return A list of [[RowtimeAttributeDescriptor]].
+ */
+ def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
+}
+
+/**
+ * Describes a rowtime attribute of a [[TableSource]].
+ *
+ * @param attributeName The name of the rowtime attribute.
+ * @param timestampExtractor The timestamp extractor to derive the values of the attribute.
+ * @param watermarkStrategy The watermark strategy associated with the attribute.
+ */
+class RowtimeAttributeDescriptor(
+ val attributeName: String,
+ val timestampExtractor: TimestampExtractor,
+ val watermarkStrategy: WatermarkStrategy) {
+
+ /** Returns the name of the rowtime attribute. */
+ def getAttributeName: String = attributeName
+
+ /** Returns the [[TimestampExtractor]] for the attribute. */
+ def getTimestampExtractor: TimestampExtractor = timestampExtractor
+
+ /** Returns the [[WatermarkStrategy]] for the attribute. */
+ def getWatermarkStrategy: WatermarkStrategy = watermarkStrategy
+
+ override def equals(other: Any): Boolean = other match {
+ case that: RowtimeAttributeDescriptor =>
+ Objects.equals(attributeName, that.attributeName) &&
+ Objects.equals(timestampExtractor, that.timestampExtractor) &&
+ Objects.equals(watermarkStrategy, that.watermarkStrategy)
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ Objects.hash(attributeName, timestampExtractor, watermarkStrategy)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
new file mode 100644
index 0000000..1e1388e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.descriptors.Rowtime
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.`type`.DecimalType
+import org.apache.flink.table.typeutils.DecimalTypeInfo
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+ * Converts an existing [[Long]], [[java.sql.Timestamp]], or
+ * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 12:34:56.000") into
+ * a rowtime attribute.
+ *
+ * @param field The field to convert into a rowtime attribute.
+ */
+final class ExistingField(val field: String) extends TimestampExtractor {
+
+ override def getArgumentFields: Array[String] = Array(field)
+
+ @throws[ValidationException]
+ override def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit = {
+ val fieldType = argumentFieldTypes(0)
+
+ fieldType match {
+ case Types.LONG => // OK
+ case Types.SQL_TIMESTAMP => // OK
+ case Types.STRING => // OK
+ case _: TypeInformation[_] =>
+ throw new ValidationException(
+ s"Field '$field' must be of type Long or Timestamp or String but is of type $fieldType.")
+ }
+ }
+
+ /**
+ * Returns an [[Expression]] that casts a [[Long]], [[java.sql.Timestamp]], or
+ * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 12:34:56.000")
+ * into a rowtime attribute.
+ */
+ override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
+ val fieldAccess: PlannerResolvedFieldReference = fieldAccesses(0)
+ .asInstanceOf[PlannerResolvedFieldReference]
+
+ val fieldReferenceExpr = new FieldReferenceExpression(
+ fieldAccess.name,
+ fieldAccess.resultType,
+ 0,
+ fieldAccess.fieldIndex)
+
+ fieldAccess.resultType match {
+ case Types.LONG =>
+ // access LONG field
+ val innerDiv = new CallExpression(
+ BuiltInFunctionDefinitions.DIVIDE,
+ List(fieldReferenceExpr,
+ new ValueLiteralExpression(new java.math.BigDecimal(1000),
+ DecimalTypeInfo.of(DecimalType.MAX_PRECISION, DecimalType.MAX_COMPACT_PRECISION)))
+ )
+ new CallExpression(
+ BuiltInFunctionDefinitions.CAST,
+ List(innerDiv, new TypeLiteralExpression(Types.SQL_TIMESTAMP)))
+ case Types.SQL_TIMESTAMP =>
+ fieldReferenceExpr
+ case Types.STRING =>
+ new CallExpression(
+ BuiltInFunctionDefinitions.CAST,
+ List(fieldReferenceExpr, new TypeLiteralExpression(Types.SQL_TIMESTAMP)))
+ }
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case that: ExistingField => field == that.field
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ field.hashCode
+ }
+
+ override def toProperties: util.Map[String, String] = {
+ val javaMap = new util.HashMap[String, String]()
+ javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD)
+ javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_FROM, field)
+ javaMap
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
new file mode 100644
index 0000000..8dbe0e1
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies
+
+import org.apache.flink.streaming.api.watermark.Watermark
+
+/**
+ * A watermark strategy for ascending rowtime attributes.
+ *
+ * Emits a watermark of the maximum observed timestamp so far minus 1.
+ * Rows that have a timestamp equal to the max timestamp are not late.
+ */
+final class AscendingTimestamps extends PeriodicWatermarkAssigner {
+
+ var maxTimestamp: Long = Long.MinValue + 1
+
+ override def nextTimestamp(timestamp: Long): Unit = {
+ if (timestamp > maxTimestamp) {
+ maxTimestamp = timestamp
+ }
+ }
+
+ override def getWatermark: Watermark = new Watermark(maxTimestamp - 1)
+
+ override def equals(obj: Any): Boolean = obj match {
+ case _: AscendingTimestamps => true
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ classOf[AscendingTimestamps].hashCode()
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
new file mode 100644
index 0000000..088ac80
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies
+
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.descriptors.Rowtime
+
+import java.util
+
+/** A periodic watermark assigner. */
+abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
+
+ /**
+ * Updates the assigner with the next timestamp.
+ *
+ * @param timestamp The next timestamp to update the assigner.
+ */
+ def nextTimestamp(timestamp: Long): Unit
+
+ /**
+ * Returns the current watermark.
+ *
+ * @return The current watermark.
+ */
+ def getWatermark: Watermark
+}
+
+/** A punctuated watermark assigner. */
+abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
+
+ /**
+ * Returns the watermark for the current row or null if no watermark should be generated.
+ *
+ * @param row The current row.
+ * @param timestamp The value of the timestamp attribute for the row.
+ * @return The watermark for this row or null if no watermark should be generated.
+ */
+ def getWatermark(row: BaseRow, timestamp: Long): Watermark
+}
+
+/** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/
+final class PreserveWatermarks extends WatermarkStrategy {
+
+ override def equals(obj: scala.Any): Boolean = {
+ obj match {
+ case _: PreserveWatermarks => true
+ case _ => false
+ }
+ }
+
+ override def hashCode(): Int = {
+ classOf[PreserveWatermarks].hashCode()
+ }
+
+ override def toProperties: util.Map[String, String] = {
+ val javaMap = new util.HashMap[String, String]()
+ javaMap.put(Rowtime.ROWTIME_WATERMARKS_TYPE, Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE)
+ javaMap
+ }
+}
+
+object PreserveWatermarks {
+ val INSTANCE: PreserveWatermarks = new PreserveWatermarks
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
new file mode 100644
index 0000000..1e6c3cf
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.runtime.utils.BatchTestBase
+import org.apache.flink.table.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.table.util.TestTableSourceWithTime
+import org.apache.flink.types.Row
+
+import org.junit.Test
+
+import java.lang.{Integer => JInt, Long => JLong}
+import java.sql.Timestamp
+
+import scala.collection.JavaConversions._
+
+class TableScanITCase extends BatchTestBase {
+
+ @Test
+ def testTableSourceWithoutTimeAttribute(): Unit = {
+ val tableName = "MyTable"
+
+ val tableSource = new BatchTableSource[Row]() {
+ private val fieldNames: Array[String] = Array("name", "id", "value")
+ private val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG, Types.INT)
+
+ override def getBoundedStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+ val data = Seq(
+ row("Mary", new JLong(1L), new JInt(1)),
+ row("Bob", new JLong(2L), new JInt(3))
+ )
+ execEnv.fromCollection(data).returns(getReturnType)
+ }
+
+ override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames)
+
+ override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)
+ }
+ tEnv.registerTableSource(tableName, tableSource)
+
+ checkResult(
+ s"SELECT * from $tableName",
+ Seq(
+ row("Mary", 1L, 1),
+ row("Bob", 2L, 3))
+ )
+ }
+
+ @Test
+ def testProctimeTableSource(): Unit = {
+ val tableName = "MyTable"
+ val data = Seq("Mary", "Peter", "Bob", "Liz")
+ val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.SQL_TIMESTAMP))
+ val returnType = Types.STRING
+
+ val tableSource = new TestTableSourceWithTime(schema, returnType, data, null, "ptime")
+ tEnv.registerTableSource(tableName, tableSource)
+
+ checkResult(
+ s"SELECT name FROM $tableName",
+ Seq(
+ row("Mary"),
+ row("Peter"),
+ row("Bob"),
+ row("Liz"))
+ )
+ }
+
+ @Test
+ def testRowtimeTableSource(): Unit = {
+ val tableName = "MyTable"
+ val data = Seq(
+ row("Mary", new Timestamp(1L), new JInt(10)),
+ row("Bob", new Timestamp(2L), new JInt(20)),
+ row("Mary", new Timestamp(2L), new JInt(30)),
+ row("Liz", new Timestamp(2001L), new JInt(40)))
+
+ val fieldNames = Array("name", "rtime", "amount")
+ val schema = new TableSchema(fieldNames, Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))
+ val rowType = new RowTypeInfo(
+ Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+ fieldNames)
+
+ val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", null)
+ tEnv.registerTableSource(tableName, tableSource)
+
+ checkResult(
+ s"SELECT * FROM $tableName",
+ Seq(
+ row("Mary", new Timestamp(1L), new JInt(10)),
+ row("Mary", new Timestamp(2L), new JInt(30)),
+ row("Bob", new Timestamp(2L), new JInt(20)),
+ row("Liz", new Timestamp(2001L), new JInt(40)))
+ )
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala
new file mode 100644
index 0000000..67a2f59
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.runtime.utils.{StreamingTestBase, TestingAppendSink}
+import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.table.util.{TestPreserveWMTableSource, TestTableSourceWithTime}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+import java.lang.{Integer => JInt, Long => JLong}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConversions._
+
+class TableScanITCase extends StreamingTestBase {
+
+ @Test
+ def testTableSourceWithoutTimeAttribute(): Unit = {
+ val tableName = "MyTable"
+
+ val tableSource = new StreamTableSource[Row]() {
+ private val fieldNames: Array[String] = Array("name", "id", "value")
+ private val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG, Types.INT)
+
+ override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+ val data = Seq(
+ Row.of("Mary", new JLong(1L), new JInt(1)),
+ Row.of("Bob", new JLong(2L), new JInt(3))
+ )
+ execEnv.fromCollection(data).returns(getReturnType)
+ }
+
+ override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames)
+
+ override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)
+ }
+ tEnv.registerTableSource(tableName, tableSource)
+ val sqlQuery = s"SELECT * from $tableName"
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ val expected = Seq("Mary,1,1", "Bob,2,3")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testProctimeTableSource(): Unit = {
+ val tableName = "MyTable"
+
+ val data = Seq("Mary", "Peter", "Bob", "Liz")
+
+ val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.SQL_TIMESTAMP))
+ val returnType = Types.STRING
+
+ val tableSource = new TestTableSourceWithTime(schema, returnType, data, null, "ptime")
+ tEnv.registerTableSource(tableName, tableSource)
+
+ val sqlQuery = s"SELECT name FROM $tableName"
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ val expected = Seq("Mary", "Peter", "Bob", "Liz")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowtimeTableSourcePreserveWatermarks(): Unit = {
+ val tableName = "MyTable"
+
+ // rows with timestamps and watermarks
+ val data = Seq(
+ Right(1L),
+ Left(5L, Row.of(new JInt(1), new JLong(5), "A")),
+ Left(2L, Row.of(new JInt(2), new JLong(1), "B")),
+ Right(10L),
+ Left(8L, Row.of(new JInt(6), new JLong(8), "C")),
+ Right(20L),
+ Left(21L, Row.of(new JInt(6), new JLong(21), "D")),
+ Right(30L)
+ )
+
+ val fieldNames = Array("id", "rtime", "name")
+ val schema = new TableSchema(fieldNames, Array(Types.INT, Types.SQL_TIMESTAMP, Types.STRING))
+ val rowType = new RowTypeInfo(
+ Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+ fieldNames)
+
+ val tableSource = new TestPreserveWMTableSource(schema, rowType, data, "rtime")
+ tEnv.registerTableSource(tableName, tableSource)
+ val sqlQuery = s"SELECT id, name FROM $tableName"
+ val sink = new TestingAppendSink
+
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ // append current watermark to each row to verify that original watermarks were preserved
+ .process(new ProcessFunction[Row, Row] {
+
+ override def processElement(
+ value: Row,
+ ctx: ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+ val res = new Row(3)
+ res.setField(0, value.getField(0))
+ res.setField(1, value.getField(1))
+ res.setField(2, ctx.timerService().currentWatermark())
+ out.collect(res)
+ }
+ }).addSink(sink)
+ env.execute()
+
+ val expected = Seq("1,A,1", "2,B,1", "6,C,10", "6,D,20")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
new file mode 100644
index 0000000..c40f457
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource}
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
+import org.apache.flink.table.sources._
+import org.apache.flink.table.sources.tsextractors.ExistingField
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
+
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConversions._
+
+class TestTableSourceWithTime[T](
+ tableSchema: TableSchema,
+ returnType: TypeInformation[T],
+ values: Seq[T],
+ rowtime: String = null,
+ proctime: String = null,
+ mapping: Map[String, String] = null)
+ extends StreamTableSource[T]
+ with BatchTableSource[T]
+ with DefinedRowtimeAttributes
+ with DefinedProctimeAttribute
+ with DefinedFieldMapping {
+
+ override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = {
+ execEnv.fromCollection(values, returnType)
+ }
+
+ override def getBoundedStream(streamEnv: StreamExecutionEnvironment): DataStreamSource[T] = {
+ streamEnv.fromCollection(values, returnType)
+ }
+
+ override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
+ // return a RowtimeAttributeDescriptor if rowtime attribute is defined
+ if (rowtime != null) {
+ Collections.singletonList(new RowtimeAttributeDescriptor(
+ rowtime,
+ new ExistingField(rowtime),
+ new AscendingTimestamps))
+ } else {
+ Collections.EMPTY_LIST.asInstanceOf[util.List[RowtimeAttributeDescriptor]]
+ }
+ }
+
+ override def getProctimeAttribute: String = proctime
+
+ override def getReturnType: TypeInformation[T] = returnType
+
+ override def getTableSchema: TableSchema = tableSchema
+
+ override def explainSource(): String = ""
+
+ override def getFieldMapping: util.Map[String, String] = {
+ if (mapping != null) mapping else null
+ }
+}
+
+class TestPreserveWMTableSource[T](
+ tableSchema: TableSchema,
+ returnType: TypeInformation[T],
+ values: Seq[Either[(Long, T), Long]],
+ rowtime: String)
+ extends StreamTableSource[T]
+ with DefinedRowtimeAttributes {
+
+ override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
+ Collections.singletonList(new RowtimeAttributeDescriptor(
+ rowtime,
+ new ExistingField(rowtime),
+ PreserveWatermarks.INSTANCE))
+ }
+
+ override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = {
+ execEnv.addSource(new EventTimeSourceFunction[T](values)).setParallelism(1).returns(returnType)
+ }
+
+ override def getReturnType: TypeInformation[T] = returnType
+
+ override def getTableSchema: TableSchema = tableSchema
+
+ override def explainSource(): String = ""
+
+}