You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:37 UTC

[03/15] flink git commit: [FLINK-5884] [table] Integrate time indicators for Table API & SQL

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 2224752..8eb9d40 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -20,36 +20,34 @@ package org.apache.flink.table.plan.nodes.datastream
 import java.util.{List => JList}
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.{AggregateCall, Window}
 import org.apache.calcite.rel.core.Window.Group
+import org.apache.calcite.rel.core.{AggregateCall, Window}
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
-import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.functions.NullByteKeySelector
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.nodes.OverAggregate
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.types.Row
 
-import org.apache.flink.api.java.functions.NullByteKeySelector
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
-import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
-
 class DataStreamOverAggregate(
     logicWindow: Window,
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputNode: RelNode,
-    rowRelDataType: RelDataType,
-    inputType: RelDataType)
+    schema: RowSchema,
+    inputSchema: RowSchema)
   extends SingleRel(cluster, traitSet, inputNode)
   with OverAggregate
   with DataStreamRel {
 
-  override def deriveRowType(): RelDataType = rowRelDataType
+  override def deriveRowType(): RelDataType = schema.logicalType
 
   override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
     new DataStreamOverAggregate(
@@ -57,8 +55,8 @@ class DataStreamOverAggregate(
       cluster,
       traitSet,
       inputs.get(0),
-      getRowType,
-      inputType)
+      schema,
+      inputSchema)
   }
 
   override def toString: String = {
@@ -72,14 +70,16 @@ class DataStreamOverAggregate(
     val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
 
     super.explainTerms(pw)
-      .itemIf("partitionBy", partitionToString(inputType, partitionKeys), partitionKeys.nonEmpty)
-      .item("orderBy",orderingToString(inputType, overWindow.orderKeys.getFieldCollations))
-      .itemIf("rows", windowRange(logicWindow, overWindow, getInput), overWindow.isRows)
-      .itemIf("range", windowRange(logicWindow, overWindow, getInput), !overWindow.isRows)
+      .itemIf("partitionBy",
+        partitionToString(schema.logicalType, partitionKeys), partitionKeys.nonEmpty)
+      .item("orderBy",
+        orderingToString(schema.logicalType, overWindow.orderKeys.getFieldCollations))
+      .itemIf("rows", windowRange(logicWindow, overWindow, inputNode), overWindow.isRows)
+      .itemIf("range", windowRange(logicWindow, overWindow, inputNode), !overWindow.isRows)
       .item(
         "select", aggregationToString(
-          inputType,
-          getRowType,
+          inputSchema.logicalType,
+          schema.logicalType,
           namedAggregates))
   }
 
@@ -111,13 +111,13 @@ class DataStreamOverAggregate(
       false,
       inputDS.getType)
 
-    val timeType = inputType
+    val timeType = schema.logicalType
       .getFieldList
       .get(orderKey.getFieldIndex)
-      .getValue
+      .getType
 
     timeType match {
-      case _: ProcTimeType =>
+      case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType)  =>
         // proc-time OVER window
         if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
           // unbounded OVER window
@@ -140,7 +140,8 @@ class DataStreamOverAggregate(
           throw new TableException(
             "OVER RANGE FOLLOWING windows are not supported yet.")
         }
-      case _: RowTimeType =>
+
+      case _ if FlinkTypeFactory.isRowtimeIndicatorType(timeType) =>
         // row-time OVER window
         if (overWindow.lowerBound.isPreceding &&
           overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
@@ -158,17 +159,16 @@ class DataStreamOverAggregate(
             inputDS,
             isRowTimeType = true,
             isRowsClause = overWindow.isRows
-            )
+          )
         } else {
           throw new TableException(
             "OVER RANGE FOLLOWING windows are not supported yet.")
         }
+
       case _ =>
         throw new TableException(
-          "Unsupported time type {$timeType}. " +
-            "OVER windows do only support RowTimeType and ProcTimeType.")
+          s"OVER windows can only be applied on time attributes.")
     }
-
   }
 
   def createUnboundedAndCurrentRowOverWindow(
@@ -178,16 +178,20 @@ class DataStreamOverAggregate(
     isRowsClause: Boolean): DataStream[Row] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
-    val partitionKeys: Array[Int] = overWindow.keys.toArray
-    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
-
-    // get the output types
-    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+    val partitionKeys: Array[Int] = overWindow.keys.toArray.map(schema.mapIndex)
+    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates.map {
+      namedAggregate =>
+        new CalcitePair[AggregateCall, String](
+          schema.mapAggregateCall(namedAggregate.left),
+          namedAggregate.right)
+    }
 
     val processFunction = AggregateUtil.createUnboundedOverProcessFunction(
       generator,
       namedAggregates,
-      inputType,
+      inputSchema.physicalType,
+      inputSchema.physicalTypeInfo,
+      inputSchema.physicalFieldTypeInfo,
       isRowTimeType,
       partitionKeys.nonEmpty,
       isRowsClause)
@@ -198,7 +202,7 @@ class DataStreamOverAggregate(
         inputDS
           .keyBy(partitionKeys: _*)
           .process(processFunction)
-          .returns(rowTypeInfo)
+          .returns(schema.physicalTypeInfo)
           .name(aggOpName)
           .asInstanceOf[DataStream[Row]]
       }
@@ -207,13 +211,13 @@ class DataStreamOverAggregate(
         if (isRowTimeType) {
           inputDS.keyBy(new NullByteKeySelector[Row])
             .process(processFunction).setParallelism(1).setMaxParallelism(1)
-            .returns(rowTypeInfo)
+            .returns(schema.physicalTypeInfo)
             .name(aggOpName)
             .asInstanceOf[DataStream[Row]]
         } else {
           inputDS
             .process(processFunction).setParallelism(1).setMaxParallelism(1)
-            .returns(rowTypeInfo)
+            .returns(schema.physicalTypeInfo)
             .name(aggOpName)
             .asInstanceOf[DataStream[Row]]
         }
@@ -228,19 +232,26 @@ class DataStreamOverAggregate(
     isRowsClause: Boolean): DataStream[Row] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
-    val partitionKeys: Array[Int] = overWindow.keys.toArray
-    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
+    val partitionKeys: Array[Int] = overWindow.keys.toArray.map(schema.mapIndex)
+    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates.map {
+      namedAggregate =>
+        new CalcitePair[AggregateCall, String](
+          schema.mapAggregateCall(namedAggregate.left),
+          namedAggregate.right)
+    }
 
     val precedingOffset =
-      getLowerBoundary(logicWindow, overWindow, getInput()) + (if (isRowsClause) 1 else 0)
-
-    // get the output types
-    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+      getLowerBoundary(
+        logicWindow,
+        overWindow,
+        input) + (if (isRowsClause) 1 else 0)
 
     val processFunction = AggregateUtil.createBoundedOverProcessFunction(
       generator,
       namedAggregates,
-      inputType,
+      inputSchema.physicalType,
+      inputSchema.physicalTypeInfo,
+      inputSchema.physicalFieldTypeInfo,
       precedingOffset,
       isRowsClause,
       isRowTimeType
@@ -251,7 +262,7 @@ class DataStreamOverAggregate(
         inputDS
           .keyBy(partitionKeys: _*)
           .process(processFunction)
-          .returns(rowTypeInfo)
+          .returns(schema.physicalTypeInfo)
           .name(aggOpName)
           .asInstanceOf[DataStream[Row]]
       }
@@ -260,7 +271,7 @@ class DataStreamOverAggregate(
         inputDS
           .keyBy(new NullByteKeySelector[Row])
           .process(processFunction).setParallelism(1).setMaxParallelism(1)
-          .returns(rowTypeInfo)
+          .returns(schema.physicalTypeInfo)
           .name(aggOpName)
           .asInstanceOf[DataStream[Row]]
       }
@@ -282,17 +293,18 @@ class DataStreamOverAggregate(
 
     s"over: (${
       if (!partitionKeys.isEmpty) {
-        s"PARTITION BY: ${partitionToString(inputType, partitionKeys)}, "
+        s"PARTITION BY: ${partitionToString(inputSchema.logicalType, partitionKeys)}, "
       } else {
         ""
       }
-    }ORDER BY: ${orderingToString(inputType, overWindow.orderKeys.getFieldCollations)}, " +
+    }ORDER BY: ${orderingToString(inputSchema.logicalType,
+        overWindow.orderKeys.getFieldCollations)}, " +
       s"${if (overWindow.isRows) "ROWS" else "RANGE"}" +
-      s"${windowRange(logicWindow, overWindow, getInput)}, " +
+      s"${windowRange(logicWindow, overWindow, inputNode.asInstanceOf[DataStreamRel])}, " +
       s"select: (${
         aggregationToString(
-          inputType,
-          getRowType,
+          inputSchema.logicalType,
+          schema.logicalType,
           namedAggregates)
       }))"
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index ae172a5..03938f3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -34,4 +34,3 @@ trait DataStreamRel extends FlinkRelNode {
   def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row]
 
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index c187ae8..05f60ba 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -24,7 +24,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
-import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema}
 import org.apache.flink.types.Row
 
 /**
@@ -36,27 +36,27 @@ class DataStreamScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    rowRelDataType: RelDataType)
+    schema: RowSchema)
   extends TableScan(cluster, traitSet, table)
   with StreamScan {
 
   val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])
 
-  override def deriveRowType(): RelDataType = rowRelDataType
+  override def deriveRowType(): RelDataType = schema.logicalType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamScan(
       cluster,
       traitSet,
       getTable,
-      getRowType
+      schema
     )
   }
 
   override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
-    convertToInternalRow(inputDataStream, dataStreamTable, config)
+    convertToInternalRow(schema, inputDataStream, dataStreamTable, config)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index f340ac7..47b4946 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -19,14 +19,12 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
 
-import scala.collection.JavaConverters._
-
 /**
   * Flink RelNode which matches along with Union.
   *
@@ -36,11 +34,11 @@ class DataStreamUnion(
     traitSet: RelTraitSet,
     leftNode: RelNode,
     rightNode: RelNode,
-    rowRelDataType: RelDataType)
+    schema: RowSchema)
   extends BiRel(cluster, traitSet, leftNode, rightNode)
   with DataStreamRel {
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType() = schema.logicalType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamUnion(
@@ -48,7 +46,7 @@ class DataStreamUnion(
       traitSet,
       inputs.get(0),
       inputs.get(1),
-      getRowType
+      schema
     )
   }
 
@@ -57,7 +55,7 @@ class DataStreamUnion(
   }
 
   override def toString = {
-    s"Union All(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
+    s"Union All(union: (${schema.logicalFieldNames.mkString(", ")}))"
   }
 
   override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
@@ -68,6 +66,6 @@ class DataStreamUnion(
   }
 
   private def unionSelectionToString: String = {
-    getRowType.getFieldNames.asScala.toList.mkString(", ")
+    schema.logicalFieldNames.mkString(", ")
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index 0ab4a48..c964e03 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -21,13 +21,12 @@ package org.apache.flink.table.plan.nodes.datastream
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rex.RexLiteral
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
-import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.io.ValuesInputFormat
 import org.apache.flink.types.Row
 
@@ -39,19 +38,19 @@ import scala.collection.JavaConverters._
 class DataStreamValues(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
-    rowRelDataType: RelDataType,
+    schema: RowSchema,
     tuples: ImmutableList[ImmutableList[RexLiteral]],
     ruleDescription: String)
-  extends Values(cluster, rowRelDataType, tuples, traitSet)
+  extends Values(cluster, schema.logicalType, tuples, traitSet)
   with DataStreamRel {
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType() = schema.logicalType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamValues(
       cluster,
       traitSet,
-      getRowType,
+      schema,
       getTuples,
       ruleDescription
     )
@@ -61,15 +60,13 @@ class DataStreamValues(
 
     val config = tableEnv.getConfig
 
-    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
-
     val generator = new CodeGenerator(config)
 
     // generate code for every record
     val generatedRecords = getTuples.asScala.map { r =>
       generator.generateResultExpression(
-        returnType,
-        getRowType.getFieldNames.asScala,
+        schema.physicalTypeInfo,
+        schema.physicalFieldNames,
         r.asScala)
     }
 
@@ -77,14 +74,14 @@ class DataStreamValues(
     val generatedFunction = generator.generateValuesInputFormat(
       ruleDescription,
       generatedRecords.map(_.code),
-      returnType)
+      schema.physicalTypeInfo)
 
     val inputFormat = new ValuesInputFormat[Row](
       generatedFunction.name,
       generatedFunction.code,
       generatedFunction.returnType)
 
-    tableEnv.execEnv.createInput(inputFormat, returnType)
+    tableEnv.execEnv.createInput(inputFormat, schema.physicalTypeInfo)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index 6d08302..dd82819 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -18,42 +18,46 @@
 
 package org.apache.flink.table.plan.nodes.datastream
 
+import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.CommonScan
-import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.table.plan.schema.{FlinkTable, RowSchema}
+import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.types.Row
 
-import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
 trait StreamScan extends CommonScan with DataStreamRel {
 
   protected def convertToInternalRow(
+      schema: RowSchema,
       input: DataStream[Any],
       flinkTable: FlinkTable[_],
       config: TableConfig)
     : DataStream[Row] = {
 
-    val inputType = input.getType
-
-    val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
-
     // conversion
-    if (needsConversion(inputType, internalType)) {
+    if (needsConversion(input.getType, schema.physicalTypeInfo)) {
 
-      val mapFunc = getConversionMapper(
+      val function = generatedConversionFunction(
         config,
-        inputType,
-        internalType,
+        classOf[MapFunction[Any, Row]],
+        input.getType,
+        schema.physicalTypeInfo,
         "DataStreamSourceConversion",
-        getRowType.getFieldNames,
+        schema.physicalFieldNames,
         Some(flinkTable.fieldIndexes))
 
-      val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+      val runner = new MapRunner[Any, Row](
+        function.name,
+        function.code,
+        function.returnType)
+
+      val opName = s"from: (${schema.logicalFieldNames.mkString(", ")})"
 
-      input.map(mapFunc).name(opName)
+      // TODO we need a ProcessFunction here
+      input.map(runner).name(opName)
     }
     // no conversion necessary, forward
     else {

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 0a466a3..5dc3da8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -22,10 +22,11 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
-import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.plan.schema.{RowSchema, TableSourceTable}
+import org.apache.flink.table.sources.{DefinedTimeAttributes, StreamTableSource, TableSource}
 import org.apache.flink.types.Row
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
@@ -37,7 +38,50 @@ class StreamTableSourceScan(
   extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource)
   with StreamScan {
 
-  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+  override def deriveRowType() = {
+    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+    def removeIndex[T](idx: Int, l: List[T]): List[T] = {
+      if (l.size < idx) {
+        l
+      } else {
+        l.take(idx) ++ l.drop(idx + 1)
+      }
+    }
+
+    var fieldNames = TableEnvironment.getFieldNames(tableSource).toList
+    var fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
+
+    val rowtime = tableSource match {
+      case timeSource: DefinedTimeAttributes if timeSource.getRowtimeAttribute != null =>
+        val rowtimeAttribute = timeSource.getRowtimeAttribute
+        // remove physical field if it is overwritten by time attribute
+        fieldNames = removeIndex(rowtimeAttribute.f0, fieldNames)
+        fieldTypes = removeIndex(rowtimeAttribute.f0, fieldTypes)
+        Some((rowtimeAttribute.f0, rowtimeAttribute.f1))
+      case _ =>
+        None
+    }
+
+    val proctime = tableSource match {
+      case timeSource: DefinedTimeAttributes if timeSource.getProctimeAttribute != null =>
+        val proctimeAttribute = timeSource.getProctimeAttribute
+        // remove physical field if it is overwritten by time attribute
+        fieldNames = removeIndex(proctimeAttribute.f0, fieldNames)
+        fieldTypes = removeIndex(proctimeAttribute.f0, fieldTypes)
+        Some((proctimeAttribute.f0, proctimeAttribute.f1))
+      case _ =>
+        None
+    }
+
+    flinkTypeFactory.buildLogicalRowType(
+      fieldNames,
+      fieldTypes,
+      rowtime,
+      proctime)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
     val rowCnt = metadata.getRowCount(this)
     planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
   }
@@ -67,6 +111,10 @@ class StreamTableSourceScan(
   override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
     val config = tableEnv.getConfig
     val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
-    convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config)
+    convertToInternalRow(
+      new RowSchema(getRowType),
+      inputDataStream,
+      new TableSourceTable(tableSource),
+      config)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala
index b1f991e..11b227f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala
@@ -45,7 +45,7 @@ class FlinkLogicalOverWindow(
       traitSet,
       inputs.get(0),
       windowConstants,
-      rowType,
+      getRowType,
       windowGroups)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index eacbafa..53e7b31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -47,9 +47,11 @@ class FlinkLogicalTableSourceScan(
 
   override def deriveRowType(): RelDataType = {
     val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildRowDataType(
+    flinkTypeFactory.buildLogicalRowType(
       TableEnvironment.getFieldNames(tableSource),
-      TableEnvironment.getFieldTypes(tableSource.getReturnType))
+      TableEnvironment.getFieldTypes(tableSource.getReturnType),
+      None,
+      None)
   }
 
   override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
index 4da2da9..7577deb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -53,8 +53,8 @@ class WindowStartEndPropertiesRule
     transformed.push(LogicalWindowAggregate.create(
       agg.getWindow,
       Seq(
-        NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias)),
-        NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias))
+        NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
+        NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))
       ), agg)
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
index f011b66..fc65403 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableException
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalWindowAggregate
+import org.apache.flink.table.plan.schema.RowSchema
 
 import scala.collection.JavaConversions._
 
@@ -65,8 +66,8 @@ class DataStreamAggregateRule
       traitSet,
       convInput,
       agg.getNamedAggCalls,
-      rel.getRowType,
-      agg.getInput.getRowType,
+      new RowSchema(rel.getRowType),
+      new RowSchema(agg.getInput.getRowType),
       agg.getGroupSet.toArray)
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
index 1777264..0a1a31a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamCalc
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCalc
+import org.apache.flink.table.plan.schema.RowSchema
 
 class DataStreamCalcRule
   extends ConverterRule(
@@ -42,7 +43,8 @@ class DataStreamCalcRule
       rel.getCluster,
       traitSet,
       convInput,
-      rel.getRowType,
+      new RowSchema(convInput.getRowType),
+      new RowSchema(rel.getRowType),
       calc.getProgram,
       description)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
index ae39d40..cd0663e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.rex.RexNode
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate
 import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
+import org.apache.flink.table.plan.schema.RowSchema
 
 class DataStreamCorrelateRule
   extends ConverterRule(
@@ -68,11 +69,12 @@ class DataStreamCorrelateRule
           new DataStreamCorrelate(
             rel.getCluster,
             traitSet,
+            new RowSchema(convInput.getRowType),
             convInput,
             scan,
             condition,
-            rel.getRowType,
-            join.getRowType,
+            new RowSchema(rel.getRowType),
+            new RowSchema(join.getRowType),
             join.getJoinType,
             description)
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index 175a202..28efcf5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -18,15 +18,15 @@
 
 package org.apache.flink.table.plan.rules.datastream
 
-import java.math.BigDecimal
+import java.math.{BigDecimal => JBigDecimal}
 
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexBuilder, RexCall, RexLiteral, RexNode}
+import org.apache.calcite.rex._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.table.api.{TableException, Window}
 import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.functions.TimeModeTypes
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Literal, UnresolvedFieldReference}
 import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 
@@ -49,16 +49,12 @@ class DataStreamLogicalWindowAggregateRule
 
     val timeType = windowExpression.operands.get(0).getType
     timeType match {
-      case TimeModeTypes.ROWTIME =>
-        rexBuilder.makeAbstractCast(
-          TimeModeTypes.ROWTIME,
-          rexBuilder.makeLiteral(0L, TimeModeTypes.ROWTIME, true))
-      case TimeModeTypes.PROCTIME =>
-        rexBuilder.makeAbstractCast(
-          TimeModeTypes.PROCTIME,
-          rexBuilder.makeLiteral(0L, TimeModeTypes.PROCTIME, true))
+
+      case _ if FlinkTypeFactory.isTimeIndicatorType(timeType) =>
+        rexBuilder.makeLiteral(0L, timeType, true)
+
       case _ =>
-        throw TableException(s"""Unexpected time type $timeType encountered""")
+        throw TableException(s"""Time attribute expected but $timeType encountered.""")
     }
   }
 
@@ -68,41 +64,41 @@ class DataStreamLogicalWindowAggregateRule
 
     def getOperandAsLong(call: RexCall, idx: Int): Long =
       call.getOperands.get(idx) match {
-        case v : RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue()
-        case _ => throw new TableException("Only constant window descriptors are supported")
+        case v: RexLiteral => v.getValue.asInstanceOf[JBigDecimal].longValue()
+        case _ => throw new TableException("Only constant window descriptors are supported.")
+      }
+
+    def getOperandAsTimeIndicator(call: RexCall, idx: Int): String =
+      call.getOperands.get(idx) match {
+        case v: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(v.getType) =>
+          rowType.getFieldList.get(v.getIndex).getName
+        case _ =>
+          throw new TableException("Window can only be defined over a time attribute column.")
       }
 
     windowExpr.getOperator match {
       case SqlStdOperatorTable.TUMBLE =>
+        val time = getOperandAsTimeIndicator(windowExpr, 0)
         val interval = getOperandAsLong(windowExpr, 1)
         val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS))
 
-        val window = windowExpr.getType match {
-          case TimeModeTypes.PROCTIME => w
-          case TimeModeTypes.ROWTIME => w.on("rowtime")
-        }
-        window.as("w$")
+        w.on(UnresolvedFieldReference(time)).as("w$")
 
       case SqlStdOperatorTable.HOP =>
+        val time = getOperandAsTimeIndicator(windowExpr, 0)
         val (slide, size) = (getOperandAsLong(windowExpr, 1), getOperandAsLong(windowExpr, 2))
         val w = Slide
           .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
           .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
 
-        val window = windowExpr.getType match {
-          case TimeModeTypes.PROCTIME => w
-          case TimeModeTypes.ROWTIME => w.on("rowtime")
-        }
-        window.as("w$")
+        w.on(UnresolvedFieldReference(time)).as("w$")
+
       case SqlStdOperatorTable.SESSION =>
+        val time = getOperandAsTimeIndicator(windowExpr, 0)
         val gap = getOperandAsLong(windowExpr, 1)
         val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
 
-        val window = windowExpr.getType match {
-          case TimeModeTypes.PROCTIME => w
-          case TimeModeTypes.ROWTIME => w.on("rowtime")
-        }
-        window.as("w$")
+        w.on(UnresolvedFieldReference(time)).as("w$")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
index 8e96970..b3d7603 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalOverWindow
+import org.apache.flink.table.plan.schema.RowSchema
 
 class DataStreamOverAggregateRule
   extends ConverterRule(
@@ -46,8 +47,8 @@ class DataStreamOverAggregateRule
       rel.getCluster,
       traitSet,
       convertInput,
-      rel.getRowType,
-      inputRowType)
+      new RowSchema(rel.getRowType),
+      new RowSchema(inputRowType))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
index 5bf60a7..d8dda80 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamScan
-import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema}
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalNativeTableScan
 
 class DataStreamScanRule
@@ -53,7 +53,7 @@ class DataStreamScanRule
       rel.getCluster,
       traitSet,
       scan.getTable,
-      rel.getRowType
+      new RowSchema(rel.getRowType)
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
index 4241f53..8402f6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamUnion
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalUnion
+import org.apache.flink.table.plan.schema.RowSchema
 
 class DataStreamUnionRule
   extends ConverterRule(
@@ -44,7 +45,7 @@ class DataStreamUnionRule
       traitSet,
       convLeft,
       convRight,
-      rel.getRowType)
+      new RowSchema(rel.getRowType))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
index fbad21f..a1453a8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamValues
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalValues
+import org.apache.flink.table.plan.schema.RowSchema
 
 class DataStreamValuesRule
   extends ConverterRule(
@@ -40,7 +41,7 @@ class DataStreamValuesRule
     new DataStreamValues(
       rel.getCluster,
       traitSet,
-      rel.getRowType,
+      new RowSchema(rel.getRowType),
       values.getTuples,
       description)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
index 6ce6570..70054b4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
@@ -18,13 +18,27 @@
 
 package org.apache.flink.table.plan.schema
 
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
 
 class DataStreamTable[T](
     val dataStream: DataStream[T],
     override val fieldIndexes: Array[Int],
     override val fieldNames: Array[String],
+    val rowtime: Option[(Int, String)],
+    val proctime: Option[(Int, String)],
     override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
   extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames, statistic) {
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+
+    flinkTypeFactory.buildLogicalRowType(
+      fieldNames,
+      fieldTypes,
+      rowtime,
+      proctime)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
index ea77061..752b00e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
@@ -48,10 +48,11 @@ abstract class FlinkTable[T](
   val fieldTypes: Array[TypeInformation[_]] =
     typeInfo match {
       case cType: CompositeType[_] =>
-        if (fieldNames.length != cType.getArity) {
+        // it is ok to leave out fields
+        if (fieldNames.length > cType.getArity) {
           throw new TableException(
           s"Arity of type (" + cType.getFieldNames.deep + ") " +
-            "not equal to number of field names " + fieldNames.deep + ".")
+            "must not be greater than number of field names " + fieldNames.deep + ".")
         }
         fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
       case aType: AtomicType[_] =>
@@ -64,7 +65,7 @@ abstract class FlinkTable[T](
 
   override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
     val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes)
+    flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes, None, None)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
new file mode 100644
index 0000000..b42be82
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelRecordType}
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * Schema that describes both a logical and physical row.
+  */
+class RowSchema(private val logicalRowType: RelDataType) {
+
+  private lazy val physicalRowFields: Seq[RelDataTypeField] = logicalRowType.getFieldList filter {
+    field => !FlinkTypeFactory.isTimeIndicatorType(field.getType)
+  }
+
+  private lazy val physicalRowType: RelDataType = new RelRecordType(physicalRowFields)
+
+  private lazy val physicalRowFieldTypes: Seq[TypeInformation[_]] = physicalRowFields map { f =>
+    FlinkTypeFactory.toTypeInfo(f.getType)
+  }
+
+  private lazy val physicalRowFieldNames: Seq[String] = physicalRowFields.map(_.getName)
+
+  private lazy val physicalRowTypeInfo: TypeInformation[Row] = new RowTypeInfo(
+    physicalRowFieldTypes.toArray, physicalRowFieldNames.toArray)
+
+  private lazy val indexMapping: Array[Int] = generateIndexMapping
+
+  private lazy val inputRefUpdater = new RexInputRefUpdater()
+
+  private def generateIndexMapping: Array[Int] = {
+    val mapping = new Array[Int](logicalRowType.getFieldCount)
+    var countTimeIndicators = 0
+    var i = 0
+    while (i < logicalRowType.getFieldCount) {
+      val t = logicalRowType.getFieldList.get(i).getType
+      if (FlinkTypeFactory.isTimeIndicatorType(t)) {
+        countTimeIndicators += 1
+        // no mapping
+        mapping(i) = -1
+      } else {
+        mapping(i) = i - countTimeIndicators
+      }
+      i += 1
+    }
+    mapping
+  }
+
+  private class RexInputRefUpdater extends RexShuttle {
+
+    override def visitInputRef(inputRef: RexInputRef): RexNode = {
+      new RexInputRef(mapIndex(inputRef.getIndex), inputRef.getType)
+    }
+  }
+
+  /**
+    * Returns the arity of the logical record.
+    */
+  def logicalArity: Int = logicalRowType.getFieldCount
+
+  /**
+    * Returns the arity of the physical record.
+    */
+  def physicalArity: Int = physicalTypeInfo.getArity
+
+  /**
+    * Returns a logical [[RelDataType]] including logical fields (i.e. time indicators).
+    */
+  def logicalType: RelDataType = logicalRowType
+
+  /**
+    * Returns a physical [[RelDataType]] with no logical fields (i.e. time indicators).
+    */
+  def physicalType: RelDataType = physicalRowType
+
+  /**
+    * Returns a physical [[TypeInformation]] of row with no logical fields (i.e. time indicators).
+    */
+  def physicalTypeInfo: TypeInformation[Row] = physicalRowTypeInfo
+
+  /**
+    * Returns [[TypeInformation]] of the row's fields with no logical fields (i.e. time indicators).
+    */
+  def physicalFieldTypeInfo: Seq[TypeInformation[_]] = physicalRowFieldTypes
+
+  /**
+    * Returns the logical fields names including logical fields (i.e. time indicators).
+    */
+  def logicalFieldNames: Seq[String] = logicalRowType.getFieldNames
+
+  /**
+    * Returns the physical fields names with no logical fields (i.e. time indicators).
+    */
+  def physicalFieldNames: Seq[String] = physicalRowFieldNames
+
+  /**
+    * Converts logical indices to physical indices based on this schema.
+    */
+  def mapIndex(logicalIndex: Int): Int = {
+    val mappedIndex = indexMapping(logicalIndex)
+    if (mappedIndex < 0) {
+      throw new TableException("Invalid access to a logical field.")
+    } else {
+      mappedIndex
+    }
+  }
+
+  /**
+    * Converts logical indices of a aggregate call to physical ones.
+    */
+  def mapAggregateCall(logicalAggCall: AggregateCall): AggregateCall = {
+    logicalAggCall.copy(
+      logicalAggCall.getArgList.map(mapIndex(_).asInstanceOf[Integer]),
+      if (logicalAggCall.filterArg < 0) {
+        logicalAggCall.filterArg
+      } else {
+        mapIndex(logicalAggCall.filterArg)
+      }
+    )
+  }
+
+  /**
+    * Converts logical field references of a [[RexNode]] to physical ones.
+    */
+  def mapRexNode(logicalRexNode: RexNode): RexNode = logicalRexNode.accept(inputRefUpdater)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
new file mode 100644
index 0000000..5e27061
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.RelDataTypeSystem
+import org.apache.calcite.sql.`type`.BasicSqlType
+
+/**
+  * Creates a time indicator type for event-time or processing-time, but with similar properties
+  * as a basic SQL type.
+  */
+class TimeIndicatorRelDataType(
+    typeSystem: RelDataTypeSystem,
+    originalType: BasicSqlType,
+    val isEventTime: Boolean)
+  extends BasicSqlType(
+    typeSystem,
+    originalType.getSqlTypeName,
+    originalType.getPrecision) {
+
+  override def equals(other: Any): Boolean = other match {
+    case that: TimeIndicatorRelDataType =>
+      super.equals(that) &&
+        isEventTime == that.isEventTime
+    case that: BasicSqlType =>
+      super.equals(that)
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    super.hashCode() + 42 // we change the hash code to differentiate from regular timestamps
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
index 51e2fc5..32562c7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
@@ -35,7 +35,7 @@ class MapRunner[IN, OUT](
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 
-  private var function: MapFunction[IN, OUT] = null
+  private var function: MapFunction[IN, OUT] = _
 
   override def open(parameters: Configuration): Unit = {
     LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index e38207d..07992cd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -26,17 +26,18 @@ import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.fun._
 import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction, AggregateFunction => DataStreamAggFunction, _}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.common.functions.{AggregateFunction => DataStreamAggFunction, _}
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.expressions.ExpressionUtils.isTimeIntervalLiteral
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.aggfunctions._
 import org.apache.flink.table.functions.utils.AggSqlFunction
@@ -61,26 +62,31 @@ object AggregateUtil {
     * window to evaluate final aggregate value.
     *
     * @param generator       code generator instance
-    * @param namedAggregates List of calls to aggregate functions and their output field names
-    * @param inputType Input row type
+    * @param namedAggregates Physical calls to aggregate functions and their output field names
+    * @param inputType Physical type of the row.
+    * @param inputTypeInfo Physical type information of the row.
+    * @param inputFieldTypeInfo Physical type information of the row's fields.
     * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType
     * @param isPartitioned It is a tag that indicate whether the input is partitioned
     * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause
     */
   private[flink] def createUnboundedOverProcessFunction(
-    generator: CodeGenerator,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    isRowTimeType: Boolean,
-    isPartitioned: Boolean,
-    isRowsClause: Boolean): ProcessFunction[Row, Row] = {
+      generator: CodeGenerator,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      inputTypeInfo: TypeInformation[Row],
+      inputFieldTypeInfo: Seq[TypeInformation[_]],
+      isRowTimeType: Boolean,
+      isPartitioned: Boolean,
+      isRowsClause: Boolean)
+    : ProcessFunction[Row, Row] = {
 
     val needRetract = false
     val (aggFields, aggregates) =
       transformToAggregateFunctions(
         namedAggregates.map(_.getKey),
         inputType,
-        needRetract)
+        needRetraction = false)
 
     val aggregationStateType: RowTypeInfo =
       createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
@@ -92,7 +98,7 @@ object AggregateUtil {
     val genFunction = generator.generateAggregations(
       "UnboundedProcessingOverAggregateHelper",
       generator,
-      inputType,
+      inputFieldTypeInfo,
       aggregates,
       aggFields,
       aggMapping,
@@ -112,13 +118,13 @@ object AggregateUtil {
         new RowTimeUnboundedRowsOver(
           genFunction,
           aggregationStateType,
-          FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+          inputTypeInfo)
       } else {
         // RANGE unbounded over process function
         new RowTimeUnboundedRangeOver(
           genFunction,
           aggregationStateType,
-          FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+          inputTypeInfo)
       }
     } else {
       if (isPartitioned) {
@@ -138,20 +144,25 @@ object AggregateUtil {
     * bounded OVER window to evaluate final aggregate value.
     *
     * @param generator       code generator instance
-    * @param namedAggregates List of calls to aggregate functions and their output field names
-    * @param inputType       Input row type
+    * @param namedAggregates Physical calls to aggregate functions and their output field names
+    * @param inputType Physical type of the row.
+    * @param inputTypeInfo Physical type information of the row.
+    * @param inputFieldTypeInfo Physical type information of the row's fields.
     * @param precedingOffset the preceding offset
     * @param isRowsClause   It is a tag that indicates whether the OVER clause is ROWS clause
     * @param isRowTimeType   It is a tag that indicates whether the time type is rowTimeType
     * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
     */
   private[flink] def createBoundedOverProcessFunction(
-    generator: CodeGenerator,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    precedingOffset: Long,
-    isRowsClause: Boolean,
-    isRowTimeType: Boolean): ProcessFunction[Row, Row] = {
+      generator: CodeGenerator,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      inputTypeInfo: TypeInformation[Row],
+      inputFieldTypeInfo: Seq[TypeInformation[_]],
+      precedingOffset: Long,
+      isRowsClause: Boolean,
+      isRowTimeType: Boolean)
+    : ProcessFunction[Row, Row] = {
 
     val needRetract = true
     val (aggFields, aggregates) =
@@ -161,7 +172,6 @@ object AggregateUtil {
         needRetract)
 
     val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
-    val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
 
     val forwardMapping = (0 until inputType.getFieldCount).toArray
     val aggMapping = aggregates.indices.map(x => x + inputType.getFieldCount).toArray
@@ -170,7 +180,7 @@ object AggregateUtil {
     val genFunction = generator.generateAggregations(
       "BoundedOverAggregateHelper",
       generator,
-      inputType,
+      inputFieldTypeInfo,
       aggregates,
       aggFields,
       aggMapping,
@@ -189,14 +199,14 @@ object AggregateUtil {
         new RowTimeBoundedRowsOver(
           genFunction,
           aggregationStateType,
-          inputRowType,
+          inputTypeInfo,
           precedingOffset
         )
       } else {
         new RowTimeBoundedRangeOver(
           genFunction,
           aggregationStateType,
-          inputRowType,
+          inputTypeInfo,
           precedingOffset
         )
       }
@@ -206,13 +216,13 @@ object AggregateUtil {
           genFunction,
           precedingOffset,
           aggregationStateType,
-          inputRowType)
+          inputTypeInfo)
       } else {
         new ProcTimeBoundedRangeOver(
           genFunction,
           precedingOffset,
           aggregationStateType,
-          inputRowType)
+          inputTypeInfo)
       }
     }
   }
@@ -241,12 +251,13 @@ object AggregateUtil {
     * NOTE: this function is only used for time based window on batch tables.
     */
   def createDataSetWindowPrepareMapFunction(
-      generator: CodeGenerator,
-      window: LogicalWindow,
-      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-      groupings: Array[Int],
-      inputType: RelDataType,
-      isParserCaseSensitive: Boolean)
+    generator: CodeGenerator,
+    window: LogicalWindow,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    groupings: Array[Int],
+    inputType: RelDataType,
+    inputFieldTypeInfo: Seq[TypeInformation[_]],
+    isParserCaseSensitive: Boolean)
   : MapFunction[Row, Row] = {
 
     val needRetract = false
@@ -263,28 +274,28 @@ object AggregateUtil {
         Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
 
     val (timeFieldPos, tumbleTimeWindowSize) = window match {
-      case EventTimeTumblingGroupWindow(_, time, size) if isTimeInterval(size.resultType) =>
-        val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
-        (timeFieldPos, Some(asLong(size)))
 
-      case EventTimeTumblingGroupWindow(_, time, _) =>
+      case TumblingGroupWindow(_, time, size) =>
         val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
-        (timeFieldPos, None)
+        size match {
+          case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+            (timeFieldPos, Some(value))
+          case _ => (timeFieldPos, None)
+        }
 
-      case EventTimeSessionGroupWindow(_, time, _) =>
-        val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
-        (timeFieldPos, None)
+      case SessionGroupWindow(_, time, _) =>
+        (getTimeFieldPosition(time, inputType, isParserCaseSensitive), None)
 
-      case EventTimeSlidingGroupWindow(_, time, size, slide)
-          if isTimeInterval(time.resultType) && doAllSupportPartialMerge(aggregates) =>
-        // pre-tumble incremental aggregates on time-windows
+      case SlidingGroupWindow(_, time, size, slide) =>
         val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
-        val preTumblingSize = determineLargestTumblingSize(asLong(size), asLong(slide))
-        (timeFieldPos, Some(preTumblingSize))
-
-      case EventTimeSlidingGroupWindow(_, time, _, _) =>
-        val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
-        (timeFieldPos, None)
+        size match {
+          case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+            // pre-tumble incremental aggregates on time-windows
+            val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
+            val preTumblingSize = determineLargestTumblingSize(asLong(size), asLong(slide))
+            (timeFieldPos, Some(preTumblingSize))
+          case _ => (timeFieldPos, None)
+        }
 
       case _ =>
         throw new UnsupportedOperationException(s"$window is currently not supported on batch")
@@ -296,7 +307,7 @@ object AggregateUtil {
     val genFunction = generator.generateAggregations(
       "DataSetAggregatePrepareMapHelper",
       generator,
-      inputType,
+      inputFieldTypeInfo,
       aggregates,
       aggFieldIndexes,
       aggMapping,
@@ -349,31 +360,32 @@ object AggregateUtil {
       window: LogicalWindow,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       groupings: Array[Int],
-      inputType: RelDataType,
+      physicalInputRowType: RelDataType,
+      physicalInputTypes: Seq[TypeInformation[_]],
       isParserCaseSensitive: Boolean)
     : RichGroupReduceFunction[Row, Row] = {
 
     val needRetract = false
     val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
-      inputType,
+      physicalInputRowType,
       needRetract)
 
     val returnType: RowTypeInfo = createDataSetAggregateBufferDataType(
       groupings,
       aggregates,
-      inputType,
+      physicalInputRowType,
       Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
 
     val keysAndAggregatesArity = groupings.length + namedAggregates.length
 
     window match {
-      case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+      case SlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
         // sliding time-window for partial aggregations
         val genFunction = generator.generateAggregations(
           "DataSetAggregatePrepareMapHelper",
           generator,
-          inputType,
+          physicalInputTypes,
           aggregates,
           aggFieldIndexes,
           aggregates.indices.map(_ + groupings.length).toArray,
@@ -433,7 +445,7 @@ object AggregateUtil {
     : FlatMapFunction[Row, Row] = {
 
     window match {
-      case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+      case SlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
         new DataSetSlideTimeWindowAggFlatMapFunction(
           inputType.getArity - 1,
           asLong(size),
@@ -458,7 +470,8 @@ object AggregateUtil {
       generator: CodeGenerator,
       window: LogicalWindow,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-      inputType: RelDataType,
+      physicalInputRowType: RelDataType,
+      physicalInputTypes: Seq[TypeInformation[_]],
       outputType: RelDataType,
       groupings: Array[Int],
       properties: Seq[NamedWindowProperty],
@@ -468,7 +481,7 @@ object AggregateUtil {
     val needRetract = false
     val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
-      inputType,
+      physicalInputRowType,
       needRetract)
 
     val aggMapping = aggregates.indices.toArray.map(_ + groupings.length)
@@ -476,7 +489,7 @@ object AggregateUtil {
     val genPreAggFunction = generator.generateAggregations(
       "GroupingWindowAggregateHelper",
       generator,
-      inputType,
+      physicalInputTypes,
       aggregates,
       aggFieldIndexes,
       aggMapping,
@@ -493,7 +506,7 @@ object AggregateUtil {
     val genFinalAggFunction = generator.generateAggregations(
       "GroupingWindowAggregateHelper",
       generator,
-      inputType,
+      physicalInputTypes,
       aggregates,
       aggFieldIndexes,
       aggMapping,
@@ -510,7 +523,7 @@ object AggregateUtil {
     val keysAndAggregatesArity = groupings.length + namedAggregates.length
 
     window match {
-      case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+      case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
         // tumbling time window
         val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
         if (doAllSupportPartialMerge(aggregates)) {
@@ -532,13 +545,13 @@ object AggregateUtil {
             endPos,
             outputType.getFieldCount)
         }
-      case EventTimeTumblingGroupWindow(_, _, size) =>
+      case TumblingGroupWindow(_, _, size) =>
         // tumbling count window
         new DataSetTumbleCountWindowAggReduceGroupFunction(
           genFinalAggFunction,
           asLong(size))
 
-      case EventTimeSessionGroupWindow(_, _, gap) =>
+      case SessionGroupWindow(_, _, gap) =>
         val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
         new DataSetSessionWindowAggReduceGroupFunction(
           genFinalAggFunction,
@@ -548,7 +561,7 @@ object AggregateUtil {
           asLong(gap),
           isInputCombined)
 
-      case EventTimeSlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) =>
+      case SlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) =>
         val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
         if (doAllSupportPartialMerge(aggregates)) {
           // for partial aggregations
@@ -570,7 +583,7 @@ object AggregateUtil {
             asLong(size))
         }
 
-      case EventTimeSlidingGroupWindow(_, _, size, _) =>
+      case SlidingGroupWindow(_, _, size, _) =>
         new DataSetSlideWindowAggReduceGroupFunction(
             genFinalAggFunction,
             keysAndAggregatesArity,
@@ -608,13 +621,14 @@ object AggregateUtil {
     generator: CodeGenerator,
     window: LogicalWindow,
     namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
+    physicalInputRowType: RelDataType,
+    physicalInputTypes: Seq[TypeInformation[_]],
     groupings: Array[Int]): MapPartitionFunction[Row, Row] = {
 
     val needRetract = false
     val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
-      inputType,
+      physicalInputRowType,
       needRetract)
 
     val aggMapping = aggregates.indices.map(_ + groupings.length).toArray
@@ -622,18 +636,18 @@ object AggregateUtil {
     val keysAndAggregatesArity = groupings.length + namedAggregates.length
 
     window match {
-      case EventTimeSessionGroupWindow(_, _, gap) =>
+      case SessionGroupWindow(_, _, gap) =>
         val combineReturnType: RowTypeInfo =
           createDataSetAggregateBufferDataType(
             groupings,
             aggregates,
-            inputType,
+            physicalInputRowType,
             Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)))
 
         val genFunction = generator.generateAggregations(
           "GroupingWindowAggregateHelper",
           generator,
-          inputType,
+          physicalInputTypes,
           aggregates,
           aggFieldIndexes,
           aggMapping,
@@ -679,14 +693,15 @@ object AggregateUtil {
       generator: CodeGenerator,
       window: LogicalWindow,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-      inputType: RelDataType,
+      physicalInputRowType: RelDataType,
+      physicalInputTypes: Seq[TypeInformation[_]],
       groupings: Array[Int])
     : GroupCombineFunction[Row, Row] = {
 
     val needRetract = false
     val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
-      inputType,
+      physicalInputRowType,
       needRetract)
 
     val aggMapping = aggregates.indices.map(_ + groupings.length).toArray
@@ -695,18 +710,18 @@ object AggregateUtil {
 
     window match {
 
-      case EventTimeSessionGroupWindow(_, _, gap) =>
+      case SessionGroupWindow(_, _, gap) =>
         val combineReturnType: RowTypeInfo =
           createDataSetAggregateBufferDataType(
             groupings,
             aggregates,
-            inputType,
+            physicalInputRowType,
             Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)))
 
         val genFunction = generator.generateAggregations(
           "GroupingWindowAggregateHelper",
           generator,
-          inputType,
+          physicalInputTypes,
           aggregates,
           aggFieldIndexes,
           aggMapping,
@@ -742,6 +757,7 @@ object AggregateUtil {
       generator: CodeGenerator,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputType: RelDataType,
+      inputFieldTypeInfo: Seq[TypeInformation[_]],
       outputType: RelDataType,
       groupings: Array[Int],
       inGroupingSet: Boolean): (Option[DataSetPreAggFunction],
@@ -786,7 +802,7 @@ object AggregateUtil {
       val genPreAggFunction = generator.generateAggregations(
         "DataSetAggregatePrepareMapHelper",
         generator,
-        inputType,
+        inputFieldTypeInfo,
         aggregates,
         aggInFields,
         aggregates.indices.map(_ + groupings.length).toArray,
@@ -813,7 +829,7 @@ object AggregateUtil {
       val genFinalAggFunction = generator.generateAggregations(
         "DataSetAggregateFinalHelper",
         generator,
-        inputType,
+        inputFieldTypeInfo,
         aggregates,
         aggInFields,
         aggOutFields,
@@ -837,7 +853,7 @@ object AggregateUtil {
       val genFunction = generator.generateAggregations(
         "DataSetAggregateHelper",
         generator,
-        inputType,
+        inputFieldTypeInfo,
         aggregates,
         aggInFields,
         aggOutFields,
@@ -914,6 +930,7 @@ object AggregateUtil {
       generator: CodeGenerator,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputType: RelDataType,
+      inputFieldTypeInfo: Seq[TypeInformation[_]],
       outputType: RelDataType,
       needMerge: Boolean)
     : (DataStreamAggFunction[Row, Row, Row], RowTypeInfo, RowTypeInfo) = {
@@ -931,7 +948,7 @@ object AggregateUtil {
     val genFunction = generator.generateAggregations(
       "GroupingWindowAggregateHelper",
       generator,
-      inputType,
+      inputFieldTypeInfo,
       aggregates,
       aggFields,
       aggMapping,
@@ -1047,12 +1064,9 @@ object AggregateUtil {
 
   private def isTimeWindow(window: LogicalWindow) = {
     window match {
-      case ProcessingTimeTumblingGroupWindow(_, size) => isTimeInterval(size.resultType)
-      case ProcessingTimeSlidingGroupWindow(_, size, _) => isTimeInterval(size.resultType)
-      case ProcessingTimeSessionGroupWindow(_, _) => true
-      case EventTimeTumblingGroupWindow(_, _, size) => isTimeInterval(size.resultType)
-      case EventTimeSlidingGroupWindow(_, _, size, _) => isTimeInterval(size.resultType)
-      case EventTimeSessionGroupWindow(_, _, _) => true
+      case TumblingGroupWindow(_, _, size) => isTimeIntervalLiteral(size)
+      case SlidingGroupWindow(_, _, size, _) => isTimeIntervalLiteral(size)
+      case SessionGroupWindow(_, _, _) => true
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
index 03ca02c..ef97e71 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -21,7 +21,7 @@ import java.util.{List => JList, ArrayList => JArrayList}
 
 import org.apache.flink.api.common.state._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
@@ -39,8 +39,8 @@ import org.slf4j.LoggerFactory
  */
 class RowTimeBoundedRangeOver(
     genAggregations: GeneratedAggregationsFunction,
-    aggregationStateType: RowTypeInfo,
-    inputRowType: RowTypeInfo,
+    aggregationStateType: TypeInformation[Row],
+    inputRowType: TypeInformation[Row],
     precedingOffset: Long)
   extends ProcessFunction[Row, Row]
     with Compiler[GeneratedAggregations] {

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
index 4a9a14c..7169cf7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory
 class RowTimeBoundedRowsOver(
     genAggregations: GeneratedAggregationsFunction,
     aggregationStateType: RowTypeInfo,
-    inputRowType: RowTypeInfo,
+    inputRowType: TypeInformation[Row],
     precedingOffset: Long)
   extends ProcessFunction[Row, Row]
     with Compiler[GeneratedAggregations] {

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
new file mode 100644
index 0000000..8466cdf
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.java.tuple.Tuple2
+
+/**
+  * Defines logical time attributes for a [[TableSource]]. Time attributes can be used for
+  * indicating, accessing, and working with Flink's event-time or processing-time. A
+  * [[TableSource]] that implements this interface can define names and positions of rowtime
+  * and proctime attributes in the rows it produces.
+  */
+trait DefinedTimeAttributes {
+
+  /**
+    * Defines a name and position (starting at 0) of rowtime attribute that represents Flink's
+    * event-time. Null if no rowtime should be available. If the position is within the arity of
+    * the result row, the logical attribute will overwrite the physical attribute. If the position
+    * is higher than the result row, the time attribute will be appended logically.
+    */
+  def getRowtimeAttribute: Tuple2[Int, String]
+
+  /**
+    * Defines a name and position (starting at 0) of proctime attribute that represents Flink's
+    * processing-time. Null if no proctime should be available. If the position is within the arity
+    * of the result row, the logical attribute will overwrite the physical attribute. If the
+    * position is higher than the result row, the time attribute will be appended logically.
+    */
+  def getProctimeAttribute: Tuple2[Int, String]
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
new file mode 100644
index 0000000..31dcb5c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeutils.base.{SqlTimestampComparator, SqlTimestampSerializer}
+
+/**
+  * Type information for indicating event or processing time. However, it behaves like a
+  * regular SQL timestamp.
+  */
+class TimeIndicatorTypeInfo(val isEventTime: Boolean)
+  extends SqlTimeTypeInfo[Timestamp](
+    classOf[Timestamp],
+    SqlTimestampSerializer.INSTANCE,
+    classOf[SqlTimestampComparator].asInstanceOf[Class[TypeComparator[Timestamp]]]) {
+
+  override def toString: String = s"TimeIndicatorTypeInfo"
+}
+
+object TimeIndicatorTypeInfo {
+
+  val ROWTIME_INDICATOR = new TimeIndicatorTypeInfo(true)
+  val PROCTIME_INDICATOR = new TimeIndicatorTypeInfo(false)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index 40f0cf2..9896a8c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.typeutils
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo._
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
 import org.apache.flink.table.validate._
@@ -29,6 +29,7 @@ object TypeCheckUtils {
     * SQL type but NOT vice versa.
     */
   def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match {
+    case _: TimeIndicatorTypeInfo => false
     case _: BasicTypeInfo[_] => false
     case _: SqlTimeTypeInfo[_] => false
     case _: TimeIntervalTypeInfo[_] => false
@@ -64,6 +65,8 @@ object TypeCheckUtils {
 
   def isInteger(dataType: TypeInformation[_]): Boolean = dataType == INT_TYPE_INFO
 
+  def isLong(dataType: TypeInformation[_]): Boolean = dataType == LONG_TYPE_INFO
+
   def isArray(dataType: TypeInformation[_]): Boolean = dataType match {
     case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => true
     case _ => false