You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/14 01:21:51 UTC

[flink] branch master updated: [FLINK-12374][table-planner-blink] Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. (#8407)

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2826ff8  [FLINK-12374][table-planner-blink] Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. (#8407)
2826ff8 is described below

commit 2826ff80c4b056d7af589649238b3acabca43837
Author: Jing Zhang <be...@126.com>
AuthorDate: Tue May 14 09:21:39 2019 +0800

    [FLINK-12374][table-planner-blink] Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. (#8407)
---
 .../PlannerResolvedFieldReference.scala            |  26 ++
 .../logical/FlinkLogicalTableSourceScan.scala      |  15 +-
 .../physical/batch/BatchExecTableSourceScan.scala  |  62 +++-
 .../stream/StreamExecTableSourceScan.scala         | 148 +++++++++-
 .../apache/flink/table/plan/util/ScanUtil.scala    |   2 +-
 .../flink/table/sources/TableSourceUtil.scala      | 321 ++++++++++++++++++++-
 .../table/sources/definedTimeAttributes.scala      |  95 ++++++
 .../table/sources/tsextractors/ExistingField.scala | 108 +++++++
 .../sources/wmstrategies/AscendingTimestamps.scala |  49 ++++
 .../sources/wmstrategies/watermarkStrategies.scala |  81 ++++++
 .../table/runtime/batch/sql/TableScanITCase.scala  | 119 ++++++++
 .../table/runtime/stream/sql/TableScanITCase.scala | 144 +++++++++
 .../apache/flink/table/util/testTableSources.scala | 106 +++++++
 13 files changed, 1254 insertions(+), 22 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala
new file mode 100644
index 0000000..a3406f8
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class PlannerResolvedFieldReference(
+    name: String,
+    resultType: TypeInformation[_],
+    fieldIndex: Int) extends ResolvedFieldReference
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index cb94b3a..7b2b7a3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.plan.nodes.logical
 
-import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan.isTableSourceScan
@@ -49,7 +48,9 @@ class FlinkLogicalTableSourceScan(
   extends TableScan(cluster, traitSet, relOptTable)
   with FlinkLogicalRel {
 
-  val tableSource: TableSource[_] = relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+  lazy val tableSource: TableSource[_] = tableSourceTable.tableSource
+
+  private lazy val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
 
   def copy(
       traitSet: RelTraitSet,
@@ -63,15 +64,7 @@ class FlinkLogicalTableSourceScan(
 
   override def deriveRowType(): RelDataType = {
     val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-
-    tableSource match {
-      case s: StreamTableSource[_] =>
-        TableSourceUtil.getRelDataType(s, None, streaming = true, flinkTypeFactory)
-      case _: BatchTableSource[_] =>
-        flinkTypeFactory.buildLogicalRowType(
-          tableSource.getTableSchema, isStreaming = Option.apply(false))
-      case _ => throw new TableException("Unknown TableSource type.")
-    }
+    tableSourceTable.getRowType(flinkTypeFactory)
   }
 
   override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index 93acefa..ee39511 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -18,18 +18,23 @@
 
 package org.apache.flink.table.plan.nodes.physical.batch
 
+import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.transformations.StreamTransformation
-import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.table.api.{BatchTableEnvironment, TableException, Types}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
 import org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.FlinkRelOptTable
-import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.table.sources.{BatchTableSource, TableSourceUtil}
+import org.apache.flink.table.`type`.TypeConverters.createInternalTypeFromTypeInfo
+import org.apache.flink.table.codegen.CodeGeneratorContext
+import org.apache.flink.table.plan.util.ScanUtil
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
 
 import java.util
 
@@ -75,7 +80,58 @@ class BatchExecTableSourceScan(
 
   override def translateToPlanInternal(
       tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
-    throw new TableException("Implements this")
+    val config = tableEnv.getConfig
+    val bts = tableSource.asInstanceOf[BatchTableSource[_]]
+    val inputTransform = bts.getBoundedStream(tableEnv.streamEnv).getTransformation
+
+    val fieldIndexes = TableSourceUtil.computeIndexMapping(
+      tableSource,
+      isStreamTable = false,
+      None)
+
+    // check that declared and actual type of table source DataStream are identical
+    if (createInternalTypeFromTypeInfo(inputTransform.getOutputType) !=
+      createInternalTypeFromTypeInfo(tableSource.getReturnType)) {
+      throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " +
+        s"returned a DataSet of type ${inputTransform.getOutputType} that does not match with " +
+        s"the type ${tableSource.getReturnType} declared by the TableSource.getReturnType() " +
+        s"method. Please validate the implementation of the TableSource.")
+    }
+
+    // get expression to extract rowtime attribute
+    val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression(
+      tableSource,
+      None,
+      cluster,
+      tableEnv.getRelBuilder,
+      Types.SQL_TIMESTAMP
+    )
+    if (needInternalConversion) {
+      ScanUtil.convertToInternalRow(
+        CodeGeneratorContext(config),
+        inputTransform.asInstanceOf[StreamTransformation[Any]],
+        fieldIndexes,
+        tableSource.getReturnType,
+        getRowType,
+        getTable.getQualifiedName,
+        config,
+        rowtimeExpression)
+    } else {
+      inputTransform.asInstanceOf[StreamTransformation[BaseRow]]
+    }
+
   }
 
+  def needInternalConversion: Boolean = {
+    val fieldIndexes = TableSourceUtil.computeIndexMapping(
+      tableSource,
+      isStreamTable = false,
+      None)
+    ScanUtil.hasTimeAttributeField(fieldIndexes) ||
+      ScanUtil.needsConversion(
+        tableSource.getReturnType,
+        TypeExtractor.createTypeInfo(
+          tableSource, classOf[BatchTableSource[_]], tableSource.getClass, 0)
+          .getTypeClass.asInstanceOf[Class[_]])
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index 4b2a8aa..096604b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -18,17 +18,28 @@
 
 package org.apache.flink.table.plan.nodes.physical.stream
 
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
 import org.apache.flink.streaming.api.transformations.StreamTransformation
-import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException, Types}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
 import org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.FlinkRelOptTable
-import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PreserveWatermarks, PunctuatedWatermarkAssigner}
+import org.apache.flink.table.sources.{RowtimeAttributeDescriptor, StreamTableSource, TableSourceUtil}
+import org.apache.flink.table.`type`.TypeConverters.createInternalTypeFromTypeInfo
+import org.apache.flink.table.codegen.CodeGeneratorContext
+import org.apache.flink.table.codegen.OperatorCodeGenerator._
+import org.apache.flink.table.plan.util.ScanUtil
+import org.apache.flink.table.runtime.AbstractProcessStreamOperator
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
 
 import java.util
 
@@ -79,6 +90,137 @@ class StreamExecTableSourceScan(
 
   override protected def translateToPlanInternal(
       tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
-    throw new TableException("Implements this")
+    val config = tableEnv.getConfig
+    val sts = tableSource.asInstanceOf[StreamTableSource[_]]
+    val inputTransform = sts.getDataStream(tableEnv.execEnv).getTransformation
+
+    val fieldIndexes = TableSourceUtil.computeIndexMapping(
+      tableSource,
+      isStreamTable = true,
+      None)
+
+    // check that declared and actual type of table source DataStream are identical
+    if (createInternalTypeFromTypeInfo(inputTransform.getOutputType) !=
+      createInternalTypeFromTypeInfo(tableSource.getReturnType)) {
+      throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " +
+        s"returned a DataStream of type ${inputTransform.getOutputType} that does not match with " +
+        s"the type ${tableSource.getReturnType} declared by the TableSource.getReturnType() " +
+        s"method. Please validate the implementation of the TableSource.")
+    }
+
+    // get expression to extract rowtime attribute
+    val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression(
+      tableSource,
+      None,
+      cluster,
+      tableEnv.getRelBuilder,
+      Types.SQL_TIMESTAMP
+    )
+
+    val streamTransformation = if (needInternalConversion) {
+      // extract time if the index is -1 or -2.
+      val (extractElement, resetElement) =
+        if (ScanUtil.hasTimeAttributeField(fieldIndexes)) {
+          (s"ctx.$ELEMENT = $ELEMENT;", s"ctx.$ELEMENT = null;")
+        } else {
+          ("", "")
+        }
+      val ctx = CodeGeneratorContext(config).setOperatorBaseClass(
+        classOf[AbstractProcessStreamOperator[BaseRow]])
+      ScanUtil.convertToInternalRow(
+        ctx,
+        inputTransform.asInstanceOf[StreamTransformation[Any]],
+        fieldIndexes,
+        tableSource.getReturnType,
+        getRowType,
+        getTable.getQualifiedName,
+        config,
+        rowtimeExpression,
+        beforeConvert = extractElement,
+        afterConvert = resetElement)
+    } else {
+      inputTransform.asInstanceOf[StreamTransformation[BaseRow]]
+    }
+
+    val ingestedTable = new DataStream(tableEnv.execEnv, streamTransformation)
+
+    // generate watermarks for rowtime indicator
+    val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
+      TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, None)
+
+    val withWatermarks = if (rowtimeDesc.isDefined) {
+      val rowtimeFieldIdx = getRowType.getFieldNames.indexOf(rowtimeDesc.get.getAttributeName)
+      val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
+      watermarkStrategy match {
+        case p: PeriodicWatermarkAssigner =>
+          val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+          ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+        case p: PunctuatedWatermarkAssigner =>
+          val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+          ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+        case _: PreserveWatermarks =>
+          // The watermarks have already been provided by the underlying DataStream.
+          ingestedTable
+      }
+    } else {
+      // No need to generate watermarks if no rowtime attribute is specified.
+      ingestedTable
+    }
+
+    withWatermarks.getTransformation
+  }
+
+  def needInternalConversion: Boolean = {
+    val fieldIndexes = TableSourceUtil.computeIndexMapping(
+      tableSource,
+      isStreamTable = true,
+      None)
+    ScanUtil.hasTimeAttributeField(fieldIndexes) ||
+      ScanUtil.needsConversion(
+        tableSource.getReturnType,
+        TypeExtractor.createTypeInfo(
+          tableSource, classOf[StreamTableSource[_]], tableSource.getClass, 0)
+          .getTypeClass.asInstanceOf[Class[_]])
+  }
+}
+
+/**
+  * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
+  *
+  * @param timeFieldIdx the index of the rowtime attribute.
+  * @param assigner the watermark assigner.
+  */
+private class PeriodicWatermarkAssignerWrapper(
+    timeFieldIdx: Int,
+    assigner: PeriodicWatermarkAssigner)
+  extends AssignerWithPeriodicWatermarks[BaseRow] {
+
+  override def getCurrentWatermark: Watermark = assigner.getWatermark
+
+  override def extractTimestamp(row: BaseRow, previousElementTimestamp: Long): Long = {
+    val timestamp: Long = row.getLong(timeFieldIdx)
+    assigner.nextTimestamp(timestamp)
+    0L
+  }
+}
+
+/**
+  * Generates periodic watermarks based on a [[PunctuatedWatermarkAssigner]].
+  *
+  * @param timeFieldIdx the index of the rowtime attribute.
+  * @param assigner the watermark assigner.
+  */
+private class PunctuatedWatermarkAssignerWrapper(
+    timeFieldIdx: Int,
+    assigner: PunctuatedWatermarkAssigner)
+  extends AssignerWithPunctuatedWatermarks[BaseRow] {
+
+  override def checkAndGetNextWatermark(row: BaseRow, ts: Long): Watermark = {
+    val timestamp: Long = row.getLong(timeFieldIdx)
+    assigner.getWatermark(row, timestamp)
+  }
+
+  override def extractTimestamp(element: BaseRow, previousElementTimestamp: Long): Long = {
+    0L
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ScanUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ScanUtil.scala
index f38604b..a1c85d3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ScanUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ScanUtil.scala
@@ -77,7 +77,7 @@ object ScanUtil {
       val convertFunc = CodeGenUtils.genToInternal(ctx, inputType)
       internalInType match {
         case rt: RowType => (convertFunc, rt)
-        case _ => ((record: String) => s"$GENERIC_ROW.wrap(${convertFunc(record)})",
+        case _ => ((record: String) => s"$GENERIC_ROW.of(${convertFunc(record)})",
             new RowType(internalInType))
       }
     }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
index 985cc37..f6b22f6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -18,15 +18,114 @@
 
 package org.apache.flink.table.sources
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.`type`.TypeConverters
+import org.apache.flink.table.api.{Types, ValidationException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.`type`.InternalType
+import org.apache.flink.table.`type`.InternalTypes.{BYTE, PROCTIME_BATCH_MARKER, PROCTIME_INDICATOR, PROCTIME_STREAM_MARKER, ROWTIME_BATCH_MARKER, ROWTIME_INDICATOR, ROWTIME_STREAM_MARKER}
+import org.apache.flink.table.expressions.{BuiltInFunctionDefinitions, CallExpression, PlannerResolvedFieldReference, ResolvedFieldReference, RexNodeConverter, TypeLiteralExpression}
 
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.tools.RelBuilder
+
+import scala.collection.JavaConversions._
 
 /** Util class for [[TableSource]]. */
 object TableSourceUtil {
 
   /**
+    * Computes the indices that map the input type of the DataStream to the schema of the table.
+    *
+    * The mapping is based on the field names and fails if a table field cannot be
+    * mapped to a field of the input type.
+    *
+    * @param tableSource The table source for which the table schema is mapped to the input type.
+    * @param isStreamTable True if the mapping is computed for a streaming table, false otherwise.
+    * @param selectedFields The indexes of the table schema fields for which a mapping is
+    *                       computed. If None, a mapping for all fields is computed.
+    * @return An index mapping from input type to table schema.
+    */
+  def computeIndexMapping(
+      tableSource: TableSource[_],
+      isStreamTable: Boolean,
+      selectedFields: Option[Array[Int]]): Array[Int] = {
+
+    val tableSchema = tableSource.getTableSchema
+
+    // get names of selected fields
+    val tableFieldNames =  if (selectedFields.isDefined) {
+      val names = tableSchema.getFieldNames
+      selectedFields.get.map(names(_))
+    } else {
+      tableSchema.getFieldNames
+    }
+
+    // get types of selected fields
+    val tableFieldTypes = if (selectedFields.isDefined) {
+      val types = tableSchema.getFieldTypes
+      selectedFields.get.map(types(_))
+    } else {
+      tableSchema.getFieldTypes
+    }
+
+    // get rowtime and proctime attributes
+    val rowtimeAttributes = getRowtimeAttributes(tableSource)
+    val proctimeAttributes = getProctimeAttribute(tableSource)
+
+    // compute mapping of selected fields and time attributes
+    val mapping: Array[Int] = tableFieldTypes.zip(tableFieldNames).map {
+      case (Types.SQL_TIMESTAMP, name: String)
+        if proctimeAttributes.contains(name) =>
+        if (isStreamTable) {
+          PROCTIME_STREAM_MARKER
+        } else {
+          PROCTIME_BATCH_MARKER
+        }
+      case (Types.SQL_TIMESTAMP, name: String)
+        if rowtimeAttributes.contains(name) =>
+        if (isStreamTable) {
+          ROWTIME_STREAM_MARKER
+        } else {
+          ROWTIME_BATCH_MARKER
+        }
+      case (t: TypeInformation[_], name) =>
+        // check if field is registered as time indicator
+        if (proctimeAttributes.contains(name)) {
+          throw new ValidationException(s"Processing time field '$name' has invalid type $t. " +
+            s"Processing time attributes must be of type ${Types.SQL_TIMESTAMP}.")
+        }
+        if (rowtimeAttributes.contains(name)) {
+          throw new ValidationException(s"Rowtime field '$name' has invalid type $t. " +
+            s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.")
+        }
+
+        val (physicalName, idx, tpe) = resolveInputField(name, tableSource)
+        // validate that mapped fields are are same type
+        if (tpe != t) {
+          throw new ValidationException(s"Type $t of table field '$name' does not " +
+            s"match with type $tpe of the field '$physicalName' of the TableSource return type.")
+        }
+        idx
+    }
+    val inputType = tableSource.getReturnType
+
+    // ensure that only one field is mapped to an atomic type
+    if (!inputType.isInstanceOf[CompositeType[_]] && mapping.count(_ >= 0) > 1) {
+      throw new ValidationException(
+        s"More than one table field matched to atomic input type $inputType.")
+    }
+
+    mapping
+  }
+
+  /**
     * Returns the Calcite schema of a [[TableSource]].
     *
     * @param tableSource The [[TableSource]] for which the Calcite schema is generated.
@@ -42,13 +141,29 @@ object TableSourceUtil {
       typeFactory: FlinkTypeFactory): RelDataType = {
 
     val fieldNames = tableSource.getTableSchema.getFieldNames
-    val fieldTypes = tableSource.getTableSchema.getFieldTypes
+    var fieldTypes = tableSource.getTableSchema.getFieldTypes
       .map(TypeConverters.createInternalTypeFromTypeInfo)
     // TODO get fieldNullables from TableSchema
-    val fieldNullables = fieldTypes.map(_ => true)
+    var fieldNullables = fieldTypes.map(_ => true)
 
-    // TODO supports time attributes after Expression is ready
+    if (streaming) {
+      // adjust the type of time attributes for streaming tables
+      val rowtimeAttributes = getRowtimeAttributes(tableSource)
+      val proctimeAttributes = getProctimeAttribute(tableSource)
 
+      // patch rowtime fields with time indicator type
+      rowtimeAttributes.foreach { rowtimeField =>
+        val idx = fieldNames.indexOf(rowtimeField)
+        fieldTypes = fieldTypes.patch(idx, Seq(ROWTIME_INDICATOR), 1)
+        fieldNullables = fieldNullables.patch(idx, Seq(false), 1)
+      }
+      // patch proctime field with time indicator type
+      proctimeAttributes.foreach { proctimeField =>
+        val idx = fieldNames.indexOf(proctimeField)
+        fieldTypes = fieldTypes.patch(idx, Seq(PROCTIME_INDICATOR), 1)
+        fieldNullables = fieldNullables.patch(idx, Seq(false), 1)
+      }
+    }
     val (selectedFieldNames, selectedFieldTypes, selectedFieldNullables) =
       if (selectedFields.isDefined) {
         // filter field names and types by selected fields
@@ -57,9 +172,207 @@ object TableSourceUtil {
           selectedFields.get.map(fieldTypes(_)),
           selectedFields.get.map(fieldNullables(_)))
       } else {
-        (fieldNames, fieldTypes, fieldNullables)
+        (fieldNames, fieldTypes,  fieldNullables)
       }
     typeFactory.buildRelDataType(selectedFieldNames, selectedFieldTypes, selectedFieldNullables)
   }
 
+  /**
+    * Returns the [[RowtimeAttributeDescriptor]] of a [[TableSource]].
+    *
+    * @param tableSource The [[TableSource]] for which the [[RowtimeAttributeDescriptor]] is
+    *                    returned.
+    * @param selectedFields The fields which are selected from the [[TableSource]].
+    *                       If None, all fields are selected.
+    * @return The [[RowtimeAttributeDescriptor]] of the [[TableSource]].
+    */
+  def getRowtimeAttributeDescriptor(
+      tableSource: TableSource[_],
+      selectedFields: Option[Array[Int]]): Option[RowtimeAttributeDescriptor] = {
+
+    tableSource match {
+      case r: DefinedRowtimeAttributes =>
+        val descriptors = r.getRowtimeAttributeDescriptors
+        if (descriptors.size() == 0) {
+          None
+        } else if (descriptors.size > 1) {
+          throw new ValidationException("Table with has more than a single rowtime attribute..")
+        } else {
+          // exactly one rowtime attribute descriptor
+          if (selectedFields.isEmpty) {
+            // all fields are selected.
+            Some(descriptors.get(0))
+          } else {
+            val descriptor = descriptors.get(0)
+            // look up index of row time attribute in schema
+            val fieldIdx = tableSource.getTableSchema.getFieldNames.indexOf(
+              descriptor.getAttributeName)
+            // is field among selected fields?
+            if (selectedFields.get.contains(fieldIdx)) {
+              Some(descriptor)
+            } else {
+              None
+            }
+          }
+        }
+      case _ => None
+    }
+  }
+
+  /**
+    * Obtains the [[RexNode]] expression to extract the rowtime timestamp for a [[TableSource]].
+    *
+    * @param tableSource The [[TableSource]] for which the expression is extracted.
+    * @param selectedFields The selected fields of the [[TableSource]].
+    *                       If None, all fields are selected.
+    * @param cluster The [[RelOptCluster]] of the current optimization process.
+    * @param relBuilder The [[RelBuilder]] to build the [[RexNode]].
+    * @param resultType The result type of the timestamp expression.
+    * @return The [[RexNode]] expression to extract the timestamp of the table source.
+    */
+  def getRowtimeExtractionExpression(
+      tableSource: TableSource[_],
+      selectedFields: Option[Array[Int]],
+      cluster: RelOptCluster,
+      relBuilder: RelBuilder,
+      resultType: TypeInformation[_]): Option[RexNode] = {
+
+    val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+    /**
+      * Creates a RelNode with a schema that corresponds on the given fields
+      * Fields for which no information is available, will have default values.
+      */
+    def createSchemaRelNode(fields: Array[(String, Int, TypeInformation[_])]): RelNode = {
+      val maxIdx = fields.map(_._2).max
+      val idxMap: Map[Int, (String, InternalType)] = Map(
+        fields.map(f => f._2 ->(f._1, TypeConverters.createInternalTypeFromTypeInfo(f._3))): _*)
+      val (physicalFields, physicalTypes) = (0 to maxIdx)
+        .map(i => idxMap.getOrElse(i, ("", BYTE))).unzip
+      val physicalSchema: RelDataType = typeFactory.buildRelDataType(
+        physicalFields,
+        physicalTypes)
+      LogicalValues.create(
+        cluster,
+        physicalSchema,
+        ImmutableList.of().asInstanceOf[ImmutableList[ImmutableList[RexLiteral]]])
+    }
+
+    val rowtimeDesc = getRowtimeAttributeDescriptor(tableSource, selectedFields)
+    rowtimeDesc.map { r =>
+      val tsExtractor = r.getTimestampExtractor
+
+      val fieldAccesses: Array[ResolvedFieldReference] =
+        if (tsExtractor.getArgumentFields.nonEmpty) {
+          val resolvedFields = resolveInputFields(tsExtractor.getArgumentFields, tableSource)
+          // push an empty values node with the physical schema on the relbuilder
+          relBuilder.push(createSchemaRelNode(resolvedFields))
+          // get extraction expression
+          resolvedFields.map(f => PlannerResolvedFieldReference(f._1, f._3, f._2))
+        } else {
+          new Array[ResolvedFieldReference](0)
+        }
+
+      val expression = tsExtractor.getExpression(fieldAccesses)
+      // add cast to requested type and convert expression to RexNode
+      val castExpression = new CallExpression(
+        BuiltInFunctionDefinitions.CAST,
+        List(expression, new TypeLiteralExpression(resultType)))
+      val rexExpression = castExpression.accept(new RexNodeConverter(relBuilder))
+      relBuilder.clear()
+      rexExpression
+    }
+  }
+
+  /** Returns a list with all rowtime attribute names of the [[TableSource]]. */
+  private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = {
+    tableSource match {
+      case r: DefinedRowtimeAttributes =>
+        r.getRowtimeAttributeDescriptors.map(_.getAttributeName).toArray
+      case _ =>
+        Array()
+    }
+  }
+
+  /** Returns the proctime attribute of the [[TableSource]] if it is defined. */
+  private def getProctimeAttribute(tableSource: TableSource[_]): Option[String] = {
+    tableSource match {
+      case p: DefinedProctimeAttribute if p.getProctimeAttribute != null =>
+        Some(p.getProctimeAttribute)
+      case _ =>
+        None
+    }
+  }
+
+  /**
+    * Identifies for a field name of the logical schema, the corresponding physical field in the
+    * return type of a [[TableSource]].
+    *
+    * @param fieldName The logical field to look up.
+    * @param tableSource The table source in which to look for the field.
+    * @return The name, index, and type information of the physical field.
+    */
+  private def resolveInputField(
+      fieldName: String,
+      tableSource: TableSource[_]): (String, Int, TypeInformation[_]) = {
+
+    val returnType = tableSource.getReturnType
+
+    /** Look up a field by name in a type information */
+    def lookupField(fieldName: String, failMsg: String): (String, Int, TypeInformation[_]) = {
+      returnType match {
+
+        case c: CompositeType[_] =>
+          // get and check field index
+          val idx = c.getFieldIndex(fieldName)
+          if (idx < 0) {
+            throw new ValidationException(failMsg)
+          }
+          // return field name, index, and field type
+          (fieldName, idx, c.getTypeAt(idx))
+
+        case t: TypeInformation[_] =>
+          // no composite type, we return the full atomic type as field
+          (fieldName, 0, t)
+      }
+    }
+
+    tableSource match {
+      case d: DefinedFieldMapping if d.getFieldMapping != null =>
+        // resolve field name in field mapping
+        val resolvedFieldName = d.getFieldMapping.get(fieldName)
+        if (resolvedFieldName == null) {
+          throw new ValidationException(
+            s"Field '$fieldName' could not be resolved by the field mapping.")
+        }
+        // look up resolved field in return type
+        lookupField(
+          resolvedFieldName,
+          s"Table field '$fieldName' was resolved to TableSource return type field " +
+            s"'$resolvedFieldName', but field '$resolvedFieldName' was not found in the return " +
+            s"type $returnType of the TableSource. " +
+            s"Please verify the field mapping of the TableSource.")
+      case _ =>
+        // look up field in return type
+        lookupField(
+          fieldName,
+          s"Table field '$fieldName' was not found in the return type $returnType of the " +
+            s"TableSource.")
+    }
+  }
+
+  /**
+    * Identifies the physical fields in the return type
+    * [[org.apache.flink.api.common.typeinfo.TypeInformation]] of a [[TableSource]]
+    * for a list of field names of the [[TableSource]]'s [[org.apache.flink.table.api.TableSchema]].
+    *
+    * @param fieldNames The field names to look up.
+    * @param tableSource The table source in which to look for the field.
+    * @return The name, index, and type information of the physical field.
+    */
+  private def resolveInputFields(
+      fieldNames: Array[String],
+      tableSource: TableSource[_]): Array[(String, Int, TypeInformation[_])] = {
+    fieldNames.map(resolveInputField(_, tableSource))
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
new file mode 100644
index 0000000..b144312
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+import java.util.Objects
+import javax.annotation.Nullable
+
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor
+import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy
+
+/**
+  * Extends a [[TableSource]] to specify a processing time attribute.
+  */
+trait DefinedProctimeAttribute {
+
+  /**
+    * Returns the name of a processing time attribute or null if no processing time attribute is
+    * present.
+    *
+    * The referenced attribute must be present in the [[TableSchema]] of the [[TableSource]] and of
+    * type [[Types.SQL_TIMESTAMP]].
+    */
+  @Nullable
+  def getProctimeAttribute: String
+}
+
+/**
+  * Extends a [[TableSource]] to specify rowtime attributes via a
+  * [[RowtimeAttributeDescriptor]].
+  */
+trait DefinedRowtimeAttributes {
+
+  /**
+    * Returns a list of [[RowtimeAttributeDescriptor]] for all rowtime attributes of the table.
+    *
+    * All referenced attributes must be present in the [[TableSchema]] of the [[TableSource]] and of
+    * type [[Types.SQL_TIMESTAMP]].
+    *
+    * @return A list of [[RowtimeAttributeDescriptor]].
+    */
+  def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
+}
+
+/**
+  * Describes a rowtime attribute of a [[TableSource]].
+  *
+  * @param attributeName The name of the rowtime attribute.
+  * @param timestampExtractor The timestamp extractor to derive the values of the attribute.
+  * @param watermarkStrategy The watermark strategy associated with the attribute.
+  */
+class RowtimeAttributeDescriptor(
+  val attributeName: String,
+  val timestampExtractor: TimestampExtractor,
+  val watermarkStrategy: WatermarkStrategy) {
+
+  /** Returns the name of the rowtime attribute. */
+  def getAttributeName: String = attributeName
+
+  /** Returns the [[TimestampExtractor]] for the attribute. */
+  def getTimestampExtractor: TimestampExtractor = timestampExtractor
+
+  /** Returns the [[WatermarkStrategy]] for the attribute. */
+  def getWatermarkStrategy: WatermarkStrategy = watermarkStrategy
+
+  override def equals(other: Any): Boolean = other match {
+    case that: RowtimeAttributeDescriptor =>
+        Objects.equals(attributeName, that.attributeName) &&
+        Objects.equals(timestampExtractor, that.timestampExtractor) &&
+        Objects.equals(watermarkStrategy, that.watermarkStrategy)
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Objects.hash(attributeName, timestampExtractor, watermarkStrategy)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
new file mode 100644
index 0000000..1e1388e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.descriptors.Rowtime
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.`type`.DecimalType
+import org.apache.flink.table.typeutils.DecimalTypeInfo
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Converts an existing [[Long]], [[java.sql.Timestamp]], or
+  * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 12:34:56.000") into
+  * a rowtime attribute.
+  *
+  * @param field The field to convert into a rowtime attribute.
+  */
+final class ExistingField(val field: String) extends TimestampExtractor {
+
+  override def getArgumentFields: Array[String] = Array(field)
+
+  @throws[ValidationException]
+  override def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit = {
+    val fieldType = argumentFieldTypes(0)
+
+    fieldType match {
+      case Types.LONG => // OK
+      case Types.SQL_TIMESTAMP => // OK
+      case Types.STRING => // OK
+      case _: TypeInformation[_] =>
+        throw new ValidationException(
+          s"Field '$field' must be of type Long or Timestamp or String but is of type $fieldType.")
+    }
+  }
+
+  /**
+    * Returns an [[Expression]] that casts a [[Long]], [[java.sql.Timestamp]], or
+    * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 12:34:56.000")
+    * into a rowtime attribute.
+    */
+  override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
+    val fieldAccess: PlannerResolvedFieldReference = fieldAccesses(0)
+      .asInstanceOf[PlannerResolvedFieldReference]
+
+    val fieldReferenceExpr = new FieldReferenceExpression(
+      fieldAccess.name,
+      fieldAccess.resultType,
+      0,
+      fieldAccess.fieldIndex)
+
+    fieldAccess.resultType match {
+      case Types.LONG =>
+        // access LONG field
+        val innerDiv = new CallExpression(
+          BuiltInFunctionDefinitions.DIVIDE,
+          List(fieldReferenceExpr,
+            new ValueLiteralExpression(new java.math.BigDecimal(1000),
+              DecimalTypeInfo.of(DecimalType.MAX_PRECISION, DecimalType.MAX_COMPACT_PRECISION)))
+        )
+        new CallExpression(
+          BuiltInFunctionDefinitions.CAST,
+          List(innerDiv, new TypeLiteralExpression(Types.SQL_TIMESTAMP)))
+      case Types.SQL_TIMESTAMP =>
+        fieldReferenceExpr
+      case Types.STRING =>
+        new CallExpression(
+          BuiltInFunctionDefinitions.CAST,
+          List(fieldReferenceExpr, new TypeLiteralExpression(Types.SQL_TIMESTAMP)))
+    }
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case that: ExistingField => field == that.field
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    field.hashCode
+  }
+
+  override def toProperties: util.Map[String, String] = {
+    val javaMap = new util.HashMap[String, String]()
+    javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD)
+    javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_FROM, field)
+    javaMap
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
new file mode 100644
index 0000000..8dbe0e1
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies
+
+import org.apache.flink.streaming.api.watermark.Watermark
+
+/**
+  * A watermark strategy for ascending rowtime attributes.
+  *
+  * Emits a watermark of the maximum observed timestamp so far minus 1.
+  * Rows that have a timestamp equal to the max timestamp are not late.
+  */
+final class AscendingTimestamps extends PeriodicWatermarkAssigner {
+
+  var maxTimestamp: Long = Long.MinValue + 1
+
+  override def nextTimestamp(timestamp: Long): Unit = {
+    if (timestamp > maxTimestamp) {
+      maxTimestamp = timestamp
+    }
+  }
+
+  override def getWatermark: Watermark = new Watermark(maxTimestamp - 1)
+
+  override def equals(obj: Any): Boolean = obj match {
+    case _: AscendingTimestamps => true
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    classOf[AscendingTimestamps].hashCode()
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
new file mode 100644
index 0000000..088ac80
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies
+
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.descriptors.Rowtime
+
+import java.util
+
+/** A periodic watermark assigner. */
+abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
+
+  /**
+    * Updates the assigner with the next timestamp.
+    *
+    * @param timestamp The next timestamp to update the assigner.
+    */
+  def nextTimestamp(timestamp: Long): Unit
+
+  /**
+    * Returns the current watermark.
+    *
+    * @return The current watermark.
+    */
+  def getWatermark: Watermark
+}
+
+/** A punctuated watermark assigner. */
+abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
+
+  /**
+    * Returns the watermark for the current row or null if no watermark should be generated.
+    *
+    * @param row The current row.
+    * @param timestamp The value of the timestamp attribute for the row.
+    * @return The watermark for this row or null if no watermark should be generated.
+    */
+  def getWatermark(row: BaseRow, timestamp: Long): Watermark
+}
+
+/** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/
+final class PreserveWatermarks extends WatermarkStrategy {
+
+  override def equals(obj: scala.Any): Boolean =  {
+    obj match {
+      case _: PreserveWatermarks => true
+      case _ => false
+    }
+  }
+
+  override def hashCode(): Int =  {
+    classOf[PreserveWatermarks].hashCode()
+  }
+
+  override def toProperties: util.Map[String, String] = {
+    val javaMap = new util.HashMap[String, String]()
+    javaMap.put(Rowtime.ROWTIME_WATERMARKS_TYPE, Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE)
+    javaMap
+  }
+}
+
+object PreserveWatermarks {
+  val INSTANCE: PreserveWatermarks = new PreserveWatermarks
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
new file mode 100644
index 0000000..1e6c3cf
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.runtime.utils.BatchTestBase
+import org.apache.flink.table.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.table.util.TestTableSourceWithTime
+import org.apache.flink.types.Row
+
+import org.junit.Test
+
+import java.lang.{Integer => JInt, Long => JLong}
+import java.sql.Timestamp
+
+import scala.collection.JavaConversions._
+
+class TableScanITCase extends BatchTestBase {
+
+  @Test
+  def testTableSourceWithoutTimeAttribute(): Unit = {
+    val tableName = "MyTable"
+
+    val tableSource = new BatchTableSource[Row]() {
+      private val fieldNames: Array[String] = Array("name", "id", "value")
+      private val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG, Types.INT)
+
+      override def getBoundedStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+        val data = Seq(
+          row("Mary", new JLong(1L), new JInt(1)),
+          row("Bob", new JLong(2L), new JInt(3))
+        )
+        execEnv.fromCollection(data).returns(getReturnType)
+      }
+
+      override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames)
+
+      override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)
+    }
+    tEnv.registerTableSource(tableName, tableSource)
+
+    checkResult(
+      s"SELECT * from $tableName",
+      Seq(
+        row("Mary", 1L, 1),
+        row("Bob", 2L, 3))
+    )
+  }
+
+  @Test
+  def testProctimeTableSource(): Unit = {
+    val tableName = "MyTable"
+    val data = Seq("Mary", "Peter", "Bob", "Liz")
+    val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.SQL_TIMESTAMP))
+    val returnType = Types.STRING
+
+    val tableSource = new TestTableSourceWithTime(schema, returnType, data, null, "ptime")
+    tEnv.registerTableSource(tableName, tableSource)
+
+    checkResult(
+      s"SELECT name FROM $tableName",
+      Seq(
+        row("Mary"),
+        row("Peter"),
+        row("Bob"),
+        row("Liz"))
+    )
+  }
+
+  @Test
+  def testRowtimeTableSource(): Unit = {
+    val tableName = "MyTable"
+    val data = Seq(
+      row("Mary", new Timestamp(1L), new JInt(10)),
+      row("Bob", new Timestamp(2L), new JInt(20)),
+      row("Mary", new Timestamp(2L), new JInt(30)),
+      row("Liz", new Timestamp(2001L), new JInt(40)))
+
+    val fieldNames = Array("name", "rtime", "amount")
+    val schema = new TableSchema(fieldNames, Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))
+    val rowType = new RowTypeInfo(
+      Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      fieldNames)
+
+    val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", null)
+    tEnv.registerTableSource(tableName, tableSource)
+
+    checkResult(
+      s"SELECT * FROM $tableName",
+      Seq(
+        row("Mary", new Timestamp(1L), new JInt(10)),
+        row("Mary", new Timestamp(2L), new JInt(30)),
+        row("Bob", new Timestamp(2L), new JInt(20)),
+        row("Liz", new Timestamp(2001L), new JInt(40)))
+    )
+  }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala
new file mode 100644
index 0000000..67a2f59
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.runtime.utils.{StreamingTestBase, TestingAppendSink}
+import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.table.util.{TestPreserveWMTableSource, TestTableSourceWithTime}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+import java.lang.{Integer => JInt, Long => JLong}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConversions._
+
+class TableScanITCase extends StreamingTestBase {
+
+  @Test
+  def testTableSourceWithoutTimeAttribute(): Unit = {
+    val tableName = "MyTable"
+
+    val tableSource = new StreamTableSource[Row]() {
+      private val fieldNames: Array[String] = Array("name", "id", "value")
+      private val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG, Types.INT)
+
+      override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+        val data = Seq(
+          Row.of("Mary", new JLong(1L), new JInt(1)),
+          Row.of("Bob", new JLong(2L), new JInt(3))
+        )
+        execEnv.fromCollection(data).returns(getReturnType)
+      }
+
+      override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames)
+
+      override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)
+    }
+    tEnv.registerTableSource(tableName, tableSource)
+    val sqlQuery = s"SELECT * from $tableName"
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = Seq("Mary,1,1", "Bob,2,3")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testProctimeTableSource(): Unit = {
+    val tableName = "MyTable"
+
+    val data = Seq("Mary", "Peter", "Bob", "Liz")
+
+    val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.SQL_TIMESTAMP))
+    val returnType = Types.STRING
+
+    val tableSource = new TestTableSourceWithTime(schema, returnType, data, null, "ptime")
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val sqlQuery = s"SELECT name FROM $tableName"
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = Seq("Mary", "Peter", "Bob", "Liz")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowtimeTableSourcePreserveWatermarks(): Unit = {
+    val tableName = "MyTable"
+
+    // rows with timestamps and watermarks
+    val data = Seq(
+      Right(1L),
+      Left(5L, Row.of(new JInt(1), new JLong(5), "A")),
+      Left(2L, Row.of(new JInt(2), new JLong(1), "B")),
+      Right(10L),
+      Left(8L, Row.of(new JInt(6), new JLong(8), "C")),
+      Right(20L),
+      Left(21L, Row.of(new JInt(6), new JLong(21), "D")),
+      Right(30L)
+    )
+
+    val fieldNames = Array("id", "rtime", "name")
+    val schema = new TableSchema(fieldNames, Array(Types.INT, Types.SQL_TIMESTAMP, Types.STRING))
+    val rowType = new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+      fieldNames)
+
+    val tableSource = new TestPreserveWMTableSource(schema, rowType, data, "rtime")
+    tEnv.registerTableSource(tableName, tableSource)
+    val sqlQuery = s"SELECT id, name FROM $tableName"
+    val sink = new TestingAppendSink
+
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+      // append current watermark to each row to verify that original watermarks were preserved
+      .process(new ProcessFunction[Row, Row] {
+
+      override def processElement(
+          value: Row,
+          ctx: ProcessFunction[Row, Row]#Context,
+          out: Collector[Row]): Unit = {
+        val res = new Row(3)
+        res.setField(0, value.getField(0))
+        res.setField(1, value.getField(1))
+        res.setField(2, ctx.timerService().currentWatermark())
+        out.collect(res)
+      }
+    }).addSink(sink)
+    env.execute()
+
+    val expected = Seq("1,A,1", "2,B,1", "6,C,10", "6,D,20")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
new file mode 100644
index 0000000..c40f457
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource}
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
+import org.apache.flink.table.sources._
+import org.apache.flink.table.sources.tsextractors.ExistingField
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
+
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConversions._
+
+class TestTableSourceWithTime[T](
+    tableSchema: TableSchema,
+    returnType: TypeInformation[T],
+    values: Seq[T],
+    rowtime: String = null,
+    proctime: String = null,
+    mapping: Map[String, String] = null)
+  extends StreamTableSource[T]
+  with BatchTableSource[T]
+  with DefinedRowtimeAttributes
+  with DefinedProctimeAttribute
+  with DefinedFieldMapping {
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = {
+    execEnv.fromCollection(values, returnType)
+  }
+
+  override def getBoundedStream(streamEnv: StreamExecutionEnvironment): DataStreamSource[T] = {
+    streamEnv.fromCollection(values, returnType)
+  }
+
+  override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
+    // return a RowtimeAttributeDescriptor if rowtime attribute is defined
+    if (rowtime != null) {
+      Collections.singletonList(new RowtimeAttributeDescriptor(
+        rowtime,
+        new ExistingField(rowtime),
+        new AscendingTimestamps))
+    } else {
+      Collections.EMPTY_LIST.asInstanceOf[util.List[RowtimeAttributeDescriptor]]
+    }
+  }
+
+  override def getProctimeAttribute: String = proctime
+
+  override def getReturnType: TypeInformation[T] = returnType
+
+  override def getTableSchema: TableSchema = tableSchema
+
+  override def explainSource(): String = ""
+
+  override def getFieldMapping: util.Map[String, String] = {
+    if (mapping != null) mapping else null
+  }
+}
+
+class TestPreserveWMTableSource[T](
+    tableSchema: TableSchema,
+    returnType: TypeInformation[T],
+    values: Seq[Either[(Long, T), Long]],
+    rowtime: String)
+  extends StreamTableSource[T]
+  with DefinedRowtimeAttributes {
+
+  override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
+    Collections.singletonList(new RowtimeAttributeDescriptor(
+      rowtime,
+      new ExistingField(rowtime),
+      PreserveWatermarks.INSTANCE))
+  }
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = {
+    execEnv.addSource(new EventTimeSourceFunction[T](values)).setParallelism(1).returns(returnType)
+  }
+
+  override def getReturnType: TypeInformation[T] = returnType
+
+  override def getTableSchema: TableSchema = tableSchema
+
+  override def explainSource(): String = ""
+
+}