You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/10/05 07:09:47 UTC

flink git commit: [FLINK-7446] [table] Change DefinedRowtimeAttribute to work on existing field.

Repository: flink
Updated Branches:
  refs/heads/master 5f1ec4af5 -> dae21da7e


[FLINK-7446] [table] Change DefinedRowtimeAttribute to work on existing field.

This closes #4710.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dae21da7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dae21da7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dae21da7

Branch: refs/heads/master
Commit: dae21da7eb1cee1f61f56bc2d1049156333a6a7f
Parents: 5f1ec4a
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Sep 11 00:05:06 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 4 23:28:10 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   |  66 ++++++++-
 docs/dev/table/streaming.md                     |  36 +++--
 .../table/api/StreamTableEnvironment.scala      |   2 +-
 .../flink/table/codegen/CodeGenerator.scala     |  26 +++-
 .../datastream/StreamTableSourceScan.scala      |  37 +----
 .../logical/FlinkLogicalTableSourceScan.scala   |  41 ++----
 .../PushProjectIntoTableSourceScanRule.scala    |   8 +-
 .../plan/schema/StreamTableSourceTable.scala    |  64 +++++----
 .../table/sources/definedTimeAttributes.scala   |  21 ++-
 .../api/stream/table/TableSourceTest.scala      | 138 +++++++++++++++---
 .../validation/TableSourceValidationTest.scala  |  13 +-
 .../validation/TableSourceValidationTest.scala  |  92 +++++++++++-
 .../runtime/stream/TimeAttributesITCase.scala   |  70 +++------
 .../stream/table/TableSourceITCase.scala        | 144 ++++++++++++++++++-
 .../table/utils/TestTableSourceWithTime.scala   |  52 +++++++
 15 files changed, 619 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 7163c67..f77625b 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -387,14 +387,70 @@ StreamTableSource[T] extends TableSource[T] {
 </div>
 </div>
 
-**Note:** If a Table needs to be processed in event-time, the `DataStream` returned by the `getDataStream()` method must carry timestamps and watermarks. Please see the documentation on [timestamp and watermark assignment]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) for details on how to assign timestamps and watermarks.
+**IMPORTANT:** Time-based operations on streaming tables such as windows require explicitly specified [time attributes]({{ site.baseurl }}/dev/table/streaming.html#time-attributes) (both for the [Table API](tableApi.html#group-windows) and [SQL](sql.html#group-windows)). A `StreamTableSource` defines 
 
-**Note:** Time-based operations on streaming tables such as windows in both the [Table API](tableApi.html#group-windows) and [SQL](sql.html#group-windows) require explicitly specified time attributes. 
+- an *event-time attribute* by implementing the `DefinedRowtimeAttribute` interface and
+- a *processing-time attribute* by implementing the `DefinedProctimeAttribute` interface.
 
-- `DefinedRowtimeAttribute` provides the `getRowtimeAttribute()` method to specify the name of the event-time time attribute.
-- `DefinedProctimeAttribute` provides the `getProctimeAttribute()` method to specify the name of the processing-time time attribute.
+Both are described in the following sections.
 
-Please see the documentation on [time attributes]({{ site.baseurl }}/dev/table/streaming.html#time-attributes) for details.
+#### DefinedRowtimeAttribute
+
+The `DefinedRowtimeAttribute` interface provides a single method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DefinedRowtimeAttribute {
+
+  public String getRowtimeAttribute();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+DefinedRowtimeAttribute {
+
+  def getRowtimeAttribute(): String
+}
+{% endhighlight %}
+</div>
+</div>
+
+The `getRowtimeAttribute()` method returns the name of the field that holds the event-time timestamps for the rows of the table. The field must exist in the schema of the `StreamTableSource` and be of type `LONG` or `TIMESTAMP`. Moreover, the `DataStream` returned by `StreamTableSource.getDataStream()` must have [watermarks]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) assigned which are aligned with the values of the specified timestamp field. 
+
+Please see the documentation on [timestamp and watermark assignment]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) for details on how to assign watermarks. Please note that the timestamps of a `DataStream` (the ones which are assigned by a `TimestampAssigner`) are ignored. Only the values of the `TableSource`'s rowtime field are relevant.
+
+**Note:** A `TableSource` that returns a rowtime attribute does not support projection pushdown.
+
+#### DefinedProctimeAttribute
+
+The `DefinedProctimeAttribute` interface provides a single method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DefinedProctimeAttribute {
+
+  public String getProctimeAttribute();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+DefinedProctimeAttribute {
+
+  def getProctimeAttribute(): String
+}
+{% endhighlight %}
+</div>
+</div>
+
+The `getProctimeAttribute()` method returns the name of a field that is appended to each row returned by the `StreamTableSource`. The appended field serves as a processing time timestamp and can be used in time-based operations.
+
+**Note:** A `TableSource` that returns a processing time attribute does not support projection pushdown.
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/docs/dev/table/streaming.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/streaming.md b/docs/dev/table/streaming.md
index cb77af8..91915c7 100644
--- a/docs/dev/table/streaming.md
+++ b/docs/dev/table/streaming.md
@@ -336,7 +336,7 @@ val windowedTable = tEnv
 
 ### Event time
 
-Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of the table program when reading records from persistent storage. 
+Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of the table program when reading records from persistent storage.
 
 Additionally, event time allows for unified syntax for table programs in both batch and streaming environments. A time attribute in a streaming environment can be a regular field of a record in a batch environment.
 
@@ -344,19 +344,16 @@ In order to handle out-of-order events and distinguish between on-time and late
 
 An event time attribute can be defined either during DataStream-to-Table conversion or by using a TableSource. 
 
-The Table API & SQL assume that in both cases timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a `TableSource` with knowledge about the incoming data's characteristics and is hidden from the end user of the API.
-
-
 #### During DataStream-to-Table Conversion
 
-The event time attribute is defined with the `.rowtime` property during schema definition. 
+The event time attribute is defined with the `.rowtime` property during schema definition. [Timestamps and watermarks]({{ site.baseurl }}/dev/event_time.html) must have been assigned in the `DataStream` that is converted.
 
-Timestamps and watermarks must have been assigned in the `DataStream` that is converted.
+There are two ways of defining the time attribute when converting a `DataStream` into a `Table`. Depending on whether the specified `.rowtime` field name exists in the schema of the `DataStream` or not, the timestamp field is either 
 
-There are two ways of defining the time attribute when converting a `DataStream` into a `Table`:
+- appended as a new field to the schema or
+- replaces an existing field.
 
-- Extending the physical schema by an additional logical field
-- Replacing a physical field by a logical field (e.g. because it is no longer needed after timestamp extraction).
+In either case the event time timestamp field will hold the value of the `DataStream` event time timestamp.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -415,9 +412,9 @@ val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'u
 
 #### Using a TableSource
 
-The event time attribute is defined by a `TableSource` that implements the `DefinedRowtimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
+The event time attribute is defined by a `TableSource` that implements the `DefinedRowtimeAttribute` interface. The `getRowtimeAttribute()` method returns the name of an existing field that carries the event time attribute of the table and is of type `LONG` or `TIMESTAMP`.
 
-Timestamps and watermarks must be assigned in the stream that is returned by the `getDataStream()` method.
+Moreover, the `DataStream` returned by the `getDataStream()` method must have watermarks assigned that are aligned with the defined time attribute. Please note that the timestamps of the `DataStream` (the ones which are assigned by a `TimestampAssigner`) are ignored. Only the values of the `TableSource`'s rowtime attribute are relevant.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -427,8 +424,9 @@ public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeA
 
 	@Override
 	public TypeInformation<Row> getReturnType() {
-		String[] names = new String[] {"Username" , "Data"};
-		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
+		String[] names = new String[] {"Username", "Data", "UserActionTime"};
+		TypeInformation[] types = 
+		    new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
 		return Types.ROW(names, types);
 	}
 
@@ -436,14 +434,14 @@ public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeA
 	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
 		// create stream 
 		// ...
-		// extract timestamp and assign watermarks based on knowledge of the stream
+		// assign watermarks based on the "UserActionTime" attribute
 		DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
 		return stream;
 	}
 
 	@Override
 	public String getRowtimeAttribute() {
-		// field with this name will be appended as a third field 
+		// Mark the "UserActionTime" attribute as event-time attribute.
 		return "UserActionTime";
 	}
 }
@@ -462,21 +460,21 @@ WindowedTable windowedTable = tEnv
 class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribute {
 
 	override def getReturnType = {
-		val names = Array[String]("Username" , "Data")
-		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
+		val names = Array[String]("Username" , "Data", "UserActionTime")
+		val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
 		Types.ROW(names, types)
 	}
 
 	override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
 		// create stream 
 		// ...
-		// extract timestamp and assign watermarks based on knowledge of the stream
+		// assign watermarks based on the "UserActionTime" attribute
 		val stream = inputStream.assignTimestampsAndWatermarks(...)
 		stream
 	}
 
 	override def getRowtimeAttribute = {
-		// field with this name will be appended as a third field
+		// Mark the "UserActionTime" attribute as event-time attribute.
 		"UserActionTime"
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index c7cc61b..e42beae 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -111,7 +111,7 @@ abstract class StreamTableEnvironment(
 
     // check if event-time is enabled
     tableSource match {
-      case dra: DefinedRowtimeAttribute if
+      case dra: DefinedRowtimeAttribute if dra.getRowtimeAttribute != null &&
           execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime =>
 
         throw TableException(

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index bf6ee21..3fead21 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -255,11 +255,31 @@ abstract class CodeGenerator(
         generateRowtimeAccess()
       case TimeIndicatorTypeInfo.PROCTIME_MARKER =>
         // attribute is proctime indicator.
-        // We use a null literal and generate a timestamp when we need it.
+        // we use a null literal and generate a timestamp when we need it.
         generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
       case idx =>
-        // regular attribute. Access attribute in input data type.
-        generateInputAccess(input1, input1Term, idx)
+        // get type of result field
+        val outIdx = input1Mapping.indexOf(idx)
+        val outType = returnType match {
+          case pt: PojoTypeInfo[_] => pt.getTypeAt(resultFieldNames(outIdx))
+          case ct: CompositeType[_] => ct.getTypeAt(outIdx)
+          case t: TypeInformation[_] => t
+        }
+        val inputAccess = generateInputAccess(input1, input1Term, idx)
+        // Change output type to rowtime indicator
+        if (FlinkTypeFactory.isRowtimeIndicatorType(outType) &&
+          (inputAccess.resultType == Types.LONG || inputAccess.resultType == Types.SQL_TIMESTAMP)) {
+          // This case is required for TableSources that implement DefinedRowtimeAttribute.
+          // Hard cast possible because LONG, TIMESTAMP, and ROWTIME_INDICATOR are internally
+          // represented as Long.
+          GeneratedExpression(
+            inputAccess.resultTerm,
+            inputAccess.nullTerm,
+            inputAccess.code,
+            TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
+        } else {
+          inputAccess
+        }
     }
 
     val input2AccessExprs = input2 match {

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index c7a423e..937f9c1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -20,16 +20,15 @@ package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableEnvironment}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.{RowSchema, StreamTableSourceTable}
-import org.apache.flink.table.sources._
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
 class StreamTableSourceScan(
@@ -40,35 +39,11 @@ class StreamTableSourceScan(
   extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource)
   with StreamScan {
 
-  override def deriveRowType() = {
-    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+  override def deriveRowType(): RelDataType = {
 
-    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
-    val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
-
-    val fields = fieldNames.zip(fieldTypes)
-
-    val withRowtime = tableSource match {
-      case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
-        val rowtimeAttribute = timeSource.getRowtimeAttribute
-        fields :+ (rowtimeAttribute, TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
-      case _ =>
-        fields
-    }
-
-    val withProctime = tableSource match {
-      case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
-        val proctimeAttribute = timeSource.getProctimeAttribute
-        withRowtime :+ (proctimeAttribute, TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
-      case _ =>
-        withRowtime
-    }
-
-    val (fieldNamesWithIndicators, fieldTypesWithIndicators) = withProctime.unzip
-
-    flinkTypeFactory.buildLogicalRowType(
-      fieldNamesWithIndicators,
-      fieldTypesWithIndicators)
+    StreamTableSourceTable.deriveRowTypeOfTableSource(
+      tableSource.asInstanceOf[StreamTableSource[_]],
+      cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory])
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index 470d006..b009cf1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -25,12 +25,11 @@ import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.logical.LogicalTableScan
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.FlinkConventions
-import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, TableSource}
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import org.apache.flink.table.plan.schema.{StreamTableSourceTable, TableSourceTable}
+import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource}
 
 import scala.collection.JavaConverters._
 
@@ -47,34 +46,18 @@ class FlinkLogicalTableSourceScan(
   }
 
   override def deriveRowType(): RelDataType = {
-    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-
-    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
-    val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
 
-    val fields = fieldNames.zip(fieldTypes)
-
-    val withRowtime = tableSource match {
-      case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
-        val rowtimeAttribute = timeSource.getRowtimeAttribute
-        fields :+ (rowtimeAttribute, TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
-      case _ =>
-        fields
-    }
+    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
 
-    val withProctime = tableSource match {
-      case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
-        val proctimeAttribute = timeSource.getProctimeAttribute
-        withRowtime :+ (proctimeAttribute, TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
-      case _ =>
-        withRowtime
+    tableSource match {
+      case s: StreamTableSource[_] =>
+        StreamTableSourceTable.deriveRowTypeOfTableSource(s, flinkTypeFactory)
+      case _: BatchTableSource[_] =>
+        val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
+        val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
+        flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes)
+      case _ => throw new TableException("Unknown TableSource type.")
     }
-
-    val (fieldNamesWithIndicators, fieldTypesWithIndicators) = withProctime.unzip
-
-    flinkTypeFactory.buildLogicalRowType(
-      fieldNamesWithIndicators,
-      fieldTypesWithIndicators)
   }
 
   override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
index 99a6927..503badc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
@@ -20,10 +20,9 @@ package org.apache.flink.table.plan.rules.logical
 
 import org.apache.calcite.plan.RelOptRule.{none, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.plan.util.{RexProgramExtractor, RexProgramRewriter}
 import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan}
-import org.apache.flink.table.sources.{NestedFieldsProjectableTableSource, ProjectableTableSource}
+import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, NestedFieldsProjectableTableSource, ProjectableTableSource}
 
 class PushProjectIntoTableSourceScanRule extends RelOptRule(
   operand(classOf[FlinkLogicalCalc],
@@ -33,6 +32,9 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
     scan.tableSource match {
+      // projection pushdown is not supported for sources that provide time indicators
+      case r: DefinedRowtimeAttribute if r.getRowtimeAttribute != null => false
+      case p: DefinedProctimeAttribute if p.getProctimeAttribute != null => false
       case _: ProjectableTableSource[_] => true
       case _ => false
     }
@@ -45,7 +47,7 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
 
     // if no fields can be projected, we keep the original plan.
     val source = scan.tableSource
-    if (TableEnvironment.getFieldNames(source).length != usedFields.length) {
+    if (scan.getRowType.getFieldCount != usedFields.length) {
 
       val newTableSource = source match {
         case nested: NestedFieldsProjectableTableSource[_] =>

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
index 5553797..e94b4f2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
@@ -20,10 +20,10 @@ package org.apache.flink.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, TableSource}
+import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 class StreamTableSourceTable[T](
@@ -49,42 +49,28 @@ class StreamTableSourceTable[T](
 object StreamTableSourceTable {
 
   private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = {
-    val (rowtime, proctime) = getTimeIndicators(tableSource)
+    val (_, proctime) = getTimeIndicators(tableSource)
 
     val original = TableEnvironment.getFieldIndices(tableSource)
 
-    // append rowtime marker
-    val withRowtime = if (rowtime.isDefined) {
-      original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER
-    } else {
-      original
-    }
-
     // append proctime marker
     if (proctime.isDefined) {
-      withRowtime :+ TimeIndicatorTypeInfo.PROCTIME_MARKER
+      original :+ TimeIndicatorTypeInfo.PROCTIME_MARKER
     } else {
-      withRowtime
+      original
     }
   }
 
   private def adjustFieldNames(tableSource: TableSource[_]): Array[String] = {
-    val (rowtime, proctime) = getTimeIndicators(tableSource)
+    val (_, proctime) = getTimeIndicators(tableSource)
 
     val original = TableEnvironment.getFieldNames(tableSource)
 
-    // append rowtime field
-    val withRowtime = if (rowtime.isDefined) {
-      original :+ rowtime.get
-    } else {
-      original
-    }
-
     // append proctime field
     if (proctime.isDefined) {
-      withRowtime :+ proctime.get
+      original :+ proctime.get
     } else {
-      withRowtime
+      original
     }
   }
 
@@ -93,9 +79,11 @@ object StreamTableSourceTable {
 
     val original = TableEnvironment.getFieldTypes(tableSource.getReturnType)
 
-    // append rowtime type
+    // update rowtime type
     val withRowtime = if (rowtime.isDefined) {
-      original :+ TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+      // replace field type by RowtimeIndicator type
+      val rowtimeIdx = TableEnvironment.getFieldNames(tableSource).indexOf(rowtime.get)
+      original.patch(rowtimeIdx, Seq(TimeIndicatorTypeInfo.ROWTIME_INDICATOR), 1)
     } else {
       original
     }
@@ -112,13 +100,31 @@ object StreamTableSourceTable {
 
   private def getTimeIndicators(tableSource: TableSource[_]): (Option[String], Option[String]) = {
 
+    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
+    val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
+
     val rowtime: Option[String] = tableSource match {
       case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute == null =>
         None
       case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute.trim.equals("") =>
         throw TableException("The name of the rowtime attribute must not be empty.")
+
       case timeSource: DefinedRowtimeAttribute =>
+        // validate the rowtime field exists and is of type Long or Timestamp
         val rowtimeAttribute = timeSource.getRowtimeAttribute
+        val rowtimeIdx = fieldNames.indexOf(rowtimeAttribute)
+
+        if (rowtimeIdx < 0) {
+          throw TableException(
+            s"Rowtime field '$rowtimeAttribute' is not present in TableSource. " +
+            s"Available fields are ${fieldNames.mkString("[", ", ", "]") }.")
+        }
+        val fieldType = fieldTypes(rowtimeIdx)
+        if (fieldType != Types.LONG && fieldType != Types.SQL_TIMESTAMP) {
+          throw TableException(
+            s"Rowtime field '$rowtimeAttribute' must be of type Long or Timestamp " +
+            s"but of type ${fieldTypes(rowtimeIdx)}.")
+        }
         Some(rowtimeAttribute)
       case _ =>
         None
@@ -138,4 +144,14 @@ object StreamTableSourceTable {
     }
     (rowtime, proctime)
   }
+
+  def deriveRowTypeOfTableSource(
+    tableSource: StreamTableSource[_],
+    typeFactory: FlinkTypeFactory): RelDataType = {
+
+    val fieldNames = adjustFieldNames(tableSource)
+    val fieldTypes = adjustFieldTypes(tableSource)
+
+    typeFactory.buildLogicalRowType(fieldNames, fieldTypes)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
index d381115..babd815 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
@@ -24,16 +24,27 @@ package org.apache.flink.table.sources
   * event-time.
   *
   * A [[TableSource]] that implements this interface defines the name of
-  * the event-time attribute. The attribute will be added to the schema of the
-  * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
+  * the event-time attribute. The attribute must be present in the schema of the [[TableSource]]
+  * and must be of type [[Long]] or [[java.sql.Timestamp]].
   */
 trait DefinedRowtimeAttribute {
 
   /**
-    * Defines a name of the event-time attribute that represents Flink's
-    * event-time. Null if no rowtime should be available.
+    * Defines a name of the event-time attribute that represents Flink's event-time, i.e., an
+    * attribute that is aligned with the watermarks of the
+    * [[org.apache.flink.streaming.api.datastream.DataStream]] returned by
+    * [[StreamTableSource.getDataStream()]].
     *
-    * The field will be appended to the schema provided by the [[TableSource]].
+    * An attribute with the given name must be present in the schema of the [[TableSource]].
+    * The attribute must be of type [[Long]] or [[java.sql.Timestamp]].
+    *
+    * The method should return null if no rowtime attribute is defined.
+    *
+    * @return The name of the field that represents the event-time field and which is aligned
+    *         with the watermarks of the [[org.apache.flink.streaming.api.datastream.DataStream]]
+    *         returned by [[StreamTableSource.getDataStream()]].
+    *         The field must be present in the schema of the [[TableSource]] and be of type [[Long]]
+    *         or [[java.sql.Timestamp]].
     */
   def getRowtimeAttribute: String
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
index 6967061..be073bd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
@@ -24,38 +24,78 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource}
+import org.apache.flink.table.sources._
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.types.Row
-import org.junit.Test
+import org.junit.{Assert, Test}
 
 class TableSourceTest extends TableTestBase {
 
   @Test
-  def testRowTimeTableSourceSimple(): Unit = {
+  def testTableSourceWithLongRowTimeField(): Unit = {
+
+    val tableSource = new TestRowtimeSource(
+      Array("id", "rowtime", "val", "name"),
+      Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      "rowtime"
+    )
+
     val util = streamTestUtil()
-    util.tableEnv.registerTableSource("rowTimeT", new TestRowtimeSource("addTime"))
+    util.tableEnv.registerTableSource("rowTimeT", tableSource)
 
-    val t = util.tableEnv.scan("rowTimeT").select("addTime, id, name, val")
+    val t = util.tableEnv.scan("rowTimeT").select("rowtime, id, name, val")
 
     val expected =
       unaryNode(
         "DataStreamCalc",
-        "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])",
-        term("select", "addTime", "id", "name", "val")
+        "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, rowtime, val, name])",
+        term("select", "rowtime", "id", "name", "val")
+      )
+    util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testTableSourceWithTimestampRowTimeField(): Unit = {
+
+    val tableSource = new TestRowtimeSource(
+      Array("id", "rowtime", "val", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      "rowtime"
+    )
+
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource("rowTimeT", tableSource)
+
+    val t = util.tableEnv.scan("rowTimeT").select("rowtime, id, name, val")
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, rowtime, val, name])",
+        term("select", "rowtime", "id", "name", "val")
       )
     util.verifyTable(t, expected)
   }
 
   @Test
   def testRowTimeTableSourceGroupWindow(): Unit = {
+
+    val tableSource = new TestRowtimeSource(
+      Array("id", "rowtime", "val", "name"),
+      Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      "rowtime"
+    )
+
     val util = streamTestUtil()
-    util.tableEnv.registerTableSource("rowTimeT", new TestRowtimeSource("addTime"))
+    util.tableEnv.registerTableSource("rowTimeT", tableSource)
 
     val t = util.tableEnv.scan("rowTimeT")
       .filter("val > 100")
-      .window(Tumble over 10.minutes on 'addTime as 'w)
+      .window(Tumble over 10.minutes on 'rowtime as 'w)
       .groupBy('name, 'w)
       .select('name, 'w.end, 'val.avg)
 
@@ -66,12 +106,12 @@ class TableSourceTest extends TableTestBase {
           "DataStreamGroupWindowAggregate",
           unaryNode(
             "DataStreamCalc",
-            "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])",
-            term("select", "name", "val", "addTime"),
+            "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, rowtime, val, name])",
+            term("select", "name", "val", "rowtime"),
             term("where", ">(val, 100)")
           ),
           term("groupBy", "name"),
-          term("window", "TumblingGroupWindow('w, 'addTime, 600000.millis)"),
+          term("window", "TumblingGroupWindow('w, 'rowtime, 600000.millis)"),
           term("select", "name", "AVG(val) AS TMP_1", "end('w) AS TMP_0")
         ),
         term("select", "name", "TMP_0", "TMP_1")
@@ -121,19 +161,81 @@ class TableSourceTest extends TableTestBase {
       )
     util.verifyTable(t, expected)
   }
+
+  @Test
+  def testProjectableProcTimeTableSource(): Unit = {
+    // ensures that projection is not pushed into table source with proctime indicators
+    val util = streamTestUtil()
+
+    val projectableTableSource = new TestProctimeSource("pTime") with ProjectableTableSource[Row] {
+      override def projectFields(fields: Array[Int]): TableSource[Row] = {
+        // ensure this method is not called!
+        Assert.fail()
+        null.asInstanceOf[TableSource[Row]]
+      }
+    }
+    util.tableEnv.registerTableSource("PTimeTable", projectableTableSource)
+
+    val t = util.tableEnv.scan("PTimeTable")
+      .select('name, 'val)
+      .where('val > 10)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        "StreamTableSourceScan(table=[[PTimeTable]], fields=[id, val, name, pTime])",
+        term("select", "name", "val"),
+        term("where", ">(val, 10)")
+      )
+    util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testProjectableRowTimeTableSource(): Unit = {
+    // ensures that projection is not pushed into table source with rowtime indicators
+    val util = streamTestUtil()
+
+    val projectableTableSource = new TestRowtimeSource(
+        Array("id", "rowtime", "val", "name"),
+        Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+          .asInstanceOf[Array[TypeInformation[_]]],
+        "rowtime") with ProjectableTableSource[Row] {
+
+      override def projectFields(fields: Array[Int]): TableSource[Row] = {
+        // ensure this method is not called!
+        Assert.fail()
+        null.asInstanceOf[TableSource[Row]]
+      }
+    }
+    util.tableEnv.registerTableSource("RTimeTable", projectableTableSource)
+
+    val t = util.tableEnv.scan("RTimeTable")
+      .select('name, 'val)
+      .where('val > 10)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        "StreamTableSourceScan(table=[[RTimeTable]], fields=[id, rowtime, val, name])",
+        term("select", "name", "val"),
+        term("where", ">(val, 10)")
+      )
+    util.verifyTable(t, expected)
+  }
 }
 
-class TestRowtimeSource(timeField: String)
-    extends StreamTableSource[Row] with DefinedRowtimeAttribute {
+class TestRowtimeSource(
+    fieldNames: Array[String],
+    fieldTypes: Array[TypeInformation[_]],
+    rowtimeField: String)
+  extends StreamTableSource[Row] with DefinedRowtimeAttribute {
 
   override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
 
-  override def getRowtimeAttribute: String = timeField
+  override def getRowtimeAttribute: String = rowtimeField
 
   override def getReturnType: TypeInformation[Row] = {
-    new RowTypeInfo(
-      Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
-      Array("id", "val", "name"))
+    new RowTypeInfo(fieldTypes, fieldNames)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala
index dea55c2..80f1725 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.api.stream.table.validation
 
-import org.apache.flink.table.api.TableException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableException, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.stream.table.{TestProctimeSource, TestRowtimeSource}
 import org.apache.flink.table.utils.TableTestBase
@@ -28,8 +29,16 @@ class TableSourceValidationTest extends TableTestBase {
 
   @Test(expected = classOf[TableException])
   def testRowtimeTableSourceWithEmptyName(): Unit = {
+
+    val tableSource = new TestRowtimeSource(
+      Array("id", "rowtime", "val", "name"),
+      Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      "rowtime"
+    )
+
     val util = streamTestUtil()
-    util.tableEnv.registerTableSource("rowTimeT", new TestRowtimeSource(" "))
+    util.tableEnv.registerTableSource("rowTime", tableSource)
 
     val t = util.tableEnv.scan("rowTimeT")
             .select('id)

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
index 49784be..09a9c55 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
@@ -18,8 +18,14 @@
 
 package org.apache.flink.table.api.validation
 
-import org.apache.flink.table.api.Types
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
 import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.utils.TestTableSourceWithTime
+import org.apache.flink.types.Row
 import org.junit.Test
 
 class TableSourceValidationTest {
@@ -48,4 +54,88 @@ class TableSourceValidationTest {
       // should fail, field can be empty
       .build()
   }
+
+  @Test(expected = classOf[TableException])
+  def testNonExistingRowtimeField(): Unit = {
+    val rowType = new RowTypeInfo(
+      Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "amount")
+    )
+    val ts = new TestTableSourceWithTime(
+      Seq[Row](),
+      rowType,
+      "rTime",
+      null
+    )
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    // should fail because configured rowtime field is not in schema
+    tEnv.registerTableSource("testTable", ts)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidTypeRowtimeField(): Unit = {
+    val rowType = new RowTypeInfo(
+      Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "amount")
+    )
+    val ts = new TestTableSourceWithTime(
+      Seq[Row](),
+      rowType,
+      "name",
+      null
+    )
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    // should fail because configured rowtime field is not of type Long or Timestamp
+    tEnv.registerTableSource("testTable", ts)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testEmptyRowtimeField(): Unit = {
+    val rowType = new RowTypeInfo(
+      Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "amount")
+    )
+    val ts = new TestTableSourceWithTime(
+      Seq[Row](),
+      rowType,
+      "",
+      null
+    )
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    // should fail because configured rowtime field is empty
+    tEnv.registerTableSource("testTable", ts)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testEmptyProctimeField(): Unit = {
+    val rowType = new RowTypeInfo(
+      Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "amount")
+    )
+    val ts = new TestTableSourceWithTime(
+      Seq[Row](),
+      rowType,
+      null,
+      ""
+    )
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    // should fail because configured proctime field is empty
+    tEnv.registerTableSource("testTable", ts)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 7b8b9e6..47a7341 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -19,16 +19,13 @@
 package org.apache.flink.table.runtime.stream
 
 import java.math.BigDecimal
-import java.lang.{Long => JLong, Integer => JInt}
+import java.lang.{Integer => JInt, Long => JLong}
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecEnv}
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
-import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
@@ -39,13 +36,12 @@ import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
 import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
 import org.apache.flink.table.runtime.utils.StreamITCase
-import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource}
+import org.apache.flink.table.utils.TestTableSourceWithTime
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.mutable
-import scala.collection.JavaConverters._
 
 /**
   * Tests for access and materialization of time attributes.
@@ -384,22 +380,37 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.registerTableSource("testTable", new TestTableSource)
+
+    val rows = Seq(
+      Row.of(new JInt(1), "A", new JLong(1000L)),
+      Row.of(new JInt(2), "B", new JLong(2000L)),
+      Row.of(new JInt(3), "C", new JLong(3000L)),
+      Row.of(new JInt(4), "D", new JLong(4000L)),
+      Row.of(new JInt(5), "E", new JLong(5000L)),
+      Row.of(new JInt(6), "F", new JLong(6000L)))
+    val rowType = new RowTypeInfo(
+      Array(Types.INT, Types.STRING, Types.LONG).asInstanceOf[Array[TypeInformation[_]]],
+      Array("a", "b", "rowtime")
+    )
+
+    tEnv.registerTableSource(
+      "testTable",
+      new TestTableSourceWithTime(rows, rowType, "rowtime", "proctime"))
     StreamITCase.clear
 
     val result = tEnv
       .scan("testTable")
       .where('a % 2 === 1)
-      .select('rowtime, 'a, 'b, 'c)
+      .select('rowtime, 'a, 'b)
       .toAppendStream[Row]
 
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
-      "1970-01-01 00:00:01.0,1,A,1000",
-      "1970-01-01 00:00:03.0,3,C,3000",
-      "1970-01-01 00:00:05.0,5,E,5000")
+      "1970-01-01 00:00:01.0,1,A",
+      "1970-01-01 00:00:03.0,3,C",
+      "1970-01-01 00:00:05.0,5,E")
 
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
@@ -447,40 +458,3 @@ object TimeAttributesITCase {
     var c: String = _
   }
 }
-
-class TestTableSource
-    extends StreamTableSource[Row]
-    with DefinedRowtimeAttribute
-    with DefinedProctimeAttribute {
-
-  override def getDataStream(env: JStreamExecEnv): DataStream[Row] = {
-
-    def toRow(i: Int, s: String, l: Long) = Row.of(i.asInstanceOf[JInt], s, l.asInstanceOf[JLong])
-
-    val rows = Seq(
-      toRow(1, "A", 1000L),
-      toRow(2, "B", 2000L),
-      toRow(3, "C", 3000L),
-      toRow(4, "D", 4000L),
-      toRow(5, "E", 5000L),
-      toRow(6, "F", 6000L)
-    )
-
-    env
-      .fromCollection(rows.asJava).returns(getReturnType)
-      .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Row] {
-        override def extractAscendingTimestamp(r: Row): Long = r.getField(2).asInstanceOf[Long]
-      })
-  }
-
-  override def getRowtimeAttribute: String = "rowtime"
-
-  override def getProctimeAttribute: String = "proctime"
-
-  override def getReturnType: TypeInformation[Row] = {
-    new RowTypeInfo(
-      Array(Types.INT, Types.STRING, Types.LONG).asInstanceOf[Array[TypeInformation[_]]],
-      Array("a", "b", "c")
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
index 297e9cf..9e84607 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
@@ -18,16 +18,21 @@
 
 package org.apache.flink.table.runtime.stream.table
 
+import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTimestamp}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase}
-import org.apache.flink.table.utils.TestFilterableTableSource
+import org.apache.flink.table.utils.{TestFilterableTableSource, TestTableSourceWithTime}
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
+import java.lang.{Integer => JInt, Long => JLong}
 
 import scala.collection.mutable
 
@@ -77,4 +82,139 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
       "5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testRowtimeTableSource(): Unit = {
+    StreamITCase.testResults = mutable.MutableList()
+    val tableName = "MyTable"
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of("Mary", new JLong(1L), new JInt(10)),
+      Row.of("Bob", new JLong(2L), new JInt(20)),
+      Row.of("Mary", new JLong(2L), new JInt(30)),
+      Row.of("Liz", new JLong(2001L), new JInt(40)))
+    val rowType = new RowTypeInfo(
+      Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      Array("name", "rtime", "amount"))
+
+    tEnv.registerTableSource(tableName, new TestTableSourceWithTime(data, rowType, "rtime", null))
+
+    tEnv.scan(tableName)
+      .window(Tumble over 1.second on 'rtime as 'w)
+      .groupBy('name, 'w)
+      .select('name, 'w.start, 'amount.sum)
+      .addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Mary,1970-01-01 00:00:00.0,40",
+      "Bob,1970-01-01 00:00:00.0,20",
+      "Liz,1970-01-01 00:00:02.0,40")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProctimeTableSource(): Unit = {
+    StreamITCase.testResults = mutable.MutableList()
+    val tableName = "MyTable"
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of("Mary", new JLong(1L), new JInt(10)),
+      Row.of("Bob", new JLong(2L), new JInt(20)),
+      Row.of("Mary", new JLong(2L), new JInt(30)),
+      Row.of("Liz", new JLong(2001L), new JInt(40)))
+    val rowType = new RowTypeInfo(
+      Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      Array("name", "rtime", "amount"))
+
+    tEnv.registerTableSource(tableName, new TestTableSourceWithTime(data, rowType, null, "ptime"))
+
+    tEnv.scan(tableName)
+      .where('ptime.cast(Types.LONG) > 0L)
+      .select('name, 'amount)
+      .addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Mary,10",
+      "Bob,20",
+      "Mary,30",
+      "Liz,40")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowtimeProctimeTableSource(): Unit = {
+    StreamITCase.testResults = mutable.MutableList()
+    val tableName = "MyTable"
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of("Mary", new JLong(1L), new JInt(10)),
+      Row.of("Bob", new JLong(2L), new JInt(20)),
+      Row.of("Mary", new JLong(2L), new JInt(30)),
+      Row.of("Liz", new JLong(2001L), new JInt(40)))
+    val rowType = new RowTypeInfo(
+      Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      Array("name", "rtime", "amount"))
+
+    tEnv.registerTableSource(
+      tableName,
+      new TestTableSourceWithTime(data, rowType, "rtime", "ptime"))
+
+    tEnv.scan(tableName)
+      .window(Tumble over 1.second on 'rtime as 'w)
+      .groupBy('name, 'w)
+      .select('name, 'w.start, 'amount.sum)
+      .addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Mary,1970-01-01 00:00:00.0,40",
+      "Bob,1970-01-01 00:00:00.0,20",
+      "Liz,1970-01-01 00:00:02.0,40")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowtimeAsTimestampTableSource(): Unit = {
+    StreamITCase.testResults = mutable.MutableList()
+    val tableName = "MyTable"
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of("Mary", toTimestamp(1L), new JInt(10)),
+      Row.of("Bob", toTimestamp(2L), new JInt(20)),
+      Row.of("Mary", toTimestamp(2L), new JInt(30)),
+      Row.of("Liz", toTimestamp(2001L), new JInt(40)))
+    val rowType = new RowTypeInfo(
+      Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      Array("name", "rtime", "amount"))
+
+    tEnv.registerTableSource(tableName, new TestTableSourceWithTime(data, rowType, "rtime", null))
+
+    tEnv.scan(tableName)
+      .window(Tumble over 1.second on 'rtime as 'w)
+      .groupBy('name, 'w)
+      .select('name, 'w.start, 'amount.sum)
+      .addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Mary,1970-01-01 00:00:00.0,40",
+      "Bob,1970-01-01 00:00:00.0,20",
+      "Liz,1970-01-01 00:00:02.0,40")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dae21da7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestTableSourceWithTime.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestTableSourceWithTime.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestTableSourceWithTime.scala
new file mode 100644
index 0000000..7d6919e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestTableSourceWithTime.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.sources._
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+class TestTableSourceWithTime(
+    rows: Seq[Row],
+    rowType: RowTypeInfo,
+    rowtime: String,
+    proctime: String)
+  extends StreamTableSource[Row]
+    with DefinedRowtimeAttribute
+    with DefinedProctimeAttribute {
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+
+    // The source deliberately does not assign timestamps and watermarks.
+    // If a rowtime field is configured, the field carries the timestamp.
+    // The FromElementsFunction sends out a Long.MaxValue watermark when all rows are emitted.
+    execEnv.fromCollection(rows.asJava, rowType)
+  }
+
+  override def getRowtimeAttribute: String = rowtime
+
+  override def getProctimeAttribute: String = proctime
+
+  override def getReturnType: TypeInformation[Row] = rowType
+}