You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:39 UTC

[05/15] flink git commit: [FLINK-5884] [table] Integrate time indicators for Table API & SQL

[FLINK-5884] [table] Integrate time indicators for Table API & SQL

This closes #3808.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/495f104b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/495f104b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/495f104b

Branch: refs/heads/master
Commit: 495f104b439096dc7eea5141bfe0de0283c5cc62
Parents: 28ab737
Author: twalthr <tw...@apache.org>
Authored: Thu Mar 2 16:06:55 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat May 6 01:51:31 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |  25 +-
 .../table/api/StreamTableEnvironment.scala      |  79 ++--
 .../flink/table/api/TableEnvironment.scala      | 101 ++++-
 .../flink/table/api/scala/expressionDsl.scala   |  18 +-
 .../apache/flink/table/api/scala/windows.scala  |   2 +-
 .../org/apache/flink/table/api/table.scala      |  14 +-
 .../org/apache/flink/table/api/windows.scala    | 357 +++++++--------
 .../calcite/FlinkCalciteSqlValidator.scala      |  12 +-
 .../flink/table/calcite/FlinkPlannerImpl.scala  |   6 +-
 .../flink/table/calcite/FlinkTypeFactory.scala  | 132 +++++-
 .../calcite/RelTimeIndicatorConverter.scala     | 222 ++++++++++
 .../flink/table/codegen/CodeGenerator.scala     |  99 +++--
 .../table/codegen/calls/FunctionGenerator.scala |  10 -
 .../table/expressions/ExpressionUtils.scala     |  39 ++
 .../apache/flink/table/expressions/call.scala   |  16 +-
 .../table/expressions/fieldExpression.scala     |  48 +-
 .../TimeMaterializationSqlFunction.scala        |  41 ++
 .../functions/TimeModeIndicatorFunctions.scala  | 137 ------
 .../flink/table/plan/ProjectionTranslator.scala |  31 +-
 .../table/plan/logical/LogicalWindow.scala      |  14 +-
 .../flink/table/plan/logical/groupWindows.scala | 280 ++++--------
 .../flink/table/plan/logical/operators.scala    |  40 +-
 .../flink/table/plan/nodes/CommonCalc.scala     |  37 +-
 .../table/plan/nodes/CommonCorrelate.scala      |  32 +-
 .../flink/table/plan/nodes/CommonScan.scala     |  24 +-
 .../flink/table/plan/nodes/OverAggregate.scala  |  35 +-
 .../plan/nodes/PhysicalTableSourceScan.scala    |   6 +-
 .../table/plan/nodes/dataset/BatchScan.scala    |  12 +-
 .../nodes/dataset/BatchTableSourceScan.scala    |  14 +-
 .../plan/nodes/dataset/DataSetAggregate.scala   |   8 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  21 +-
 .../plan/nodes/dataset/DataSetCorrelate.scala   |   9 +-
 .../nodes/dataset/DataSetWindowAggregate.scala  |  68 +--
 .../nodes/datastream/DataStreamAggregate.scala  | 178 ++++----
 .../plan/nodes/datastream/DataStreamCalc.scala  |  22 +-
 .../nodes/datastream/DataStreamCorrelate.scala  |  22 +-
 .../datastream/DataStreamOverAggregate.scala    | 112 ++---
 .../plan/nodes/datastream/DataStreamRel.scala   |   1 -
 .../plan/nodes/datastream/DataStreamScan.scala  |  10 +-
 .../plan/nodes/datastream/DataStreamUnion.scala |  14 +-
 .../nodes/datastream/DataStreamValues.scala     |  21 +-
 .../plan/nodes/datastream/StreamScan.scala      |  32 +-
 .../datastream/StreamTableSourceScan.scala      |  58 ++-
 .../nodes/logical/FlinkLogicalOverWindow.scala  |   2 +-
 .../logical/FlinkLogicalTableSourceScan.scala   |   6 +-
 .../common/WindowStartEndPropertiesRule.scala   |   4 +-
 .../datastream/DataStreamAggregateRule.scala    |   5 +-
 .../rules/datastream/DataStreamCalcRule.scala   |   4 +-
 .../datastream/DataStreamCorrelateRule.scala    |   6 +-
 .../DataStreamLogicalWindowAggregateRule.scala  |  56 ++-
 .../DataStreamOverAggregateRule.scala           |   5 +-
 .../rules/datastream/DataStreamScanRule.scala   |   4 +-
 .../rules/datastream/DataStreamUnionRule.scala  |   3 +-
 .../rules/datastream/DataStreamValuesRule.scala |   3 +-
 .../table/plan/schema/DataStreamTable.scala     |  14 +
 .../flink/table/plan/schema/FlinkTable.scala    |   7 +-
 .../flink/table/plan/schema/RowSchema.scala     | 152 +++++++
 .../plan/schema/TimeIndicatorRelDataType.scala  |  49 +++
 .../apache/flink/table/runtime/MapRunner.scala  |   2 +-
 .../table/runtime/aggregate/AggregateUtil.scala | 190 ++++----
 .../aggregate/RowTimeBoundedRangeOver.scala     |   6 +-
 .../aggregate/RowTimeBoundedRowsOver.scala      |   2 +-
 .../table/sources/DefinedTimeAttributes.scala   |  47 ++
 .../table/typeutils/TimeIndicatorTypeInfo.scala |  45 ++
 .../flink/table/typeutils/TypeCheckUtils.scala  |   5 +-
 .../flink/table/validate/FunctionCatalog.scala  |  10 +-
 .../api/java/batch/TableEnvironmentITCase.java  |   9 -
 .../flink/table/TableEnvironmentTest.scala      |  55 ++-
 .../scala/batch/TableEnvironmentITCase.scala    |  10 -
 .../scala/batch/sql/WindowAggregateTest.scala   |  18 +-
 .../scala/batch/table/FieldProjectionTest.scala |  36 +-
 .../api/scala/batch/table/GroupWindowTest.scala | 121 ++---
 .../table/api/scala/stream/sql/SqlITCase.scala  | 150 ++++---
 .../scala/stream/sql/WindowAggregateTest.scala  | 179 ++++----
 .../scala/stream/table/AggregationsITCase.scala |  12 +-
 .../api/scala/stream/table/CalcITCase.scala     |  16 -
 .../scala/stream/table/GroupWindowTest.scala    | 440 ++++++-------------
 .../scala/stream/table/OverWindowITCase.scala   |  12 +-
 .../api/scala/stream/table/OverWindowTest.scala | 101 ++---
 .../GroupWindowStringExpressionTest.scala       |   6 +-
 .../OverWindowStringExpressionTest.scala        |  16 +-
 .../datastream/DataStreamAggregateITCase.scala  |  22 +-
 82 files changed, 2368 insertions(+), 1921 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 00cf11c..3eb2ffc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -194,7 +194,30 @@ abstract class BatchTableEnvironment(
   protected def registerDataSetInternal[T](
       name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {
 
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields)
+    val (fieldNames, fieldIndexes) = getFieldInfo[T](
+      dataSet.getType,
+      fields,
+      ignoreTimeAttributes = true)
+
+    // validate and extract time attributes
+    val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)
+
+    // don't allow proctime on batch
+    proctime match {
+      case Some(_) =>
+        throw new ValidationException(
+          "A proctime attribute is not allowed in a batch environment. " +
+            "Working with processing-time on batch would lead to non-deterministic results.")
+      case _ => // ok
+    }
+    // rowtime must not extend the schema of a batch table
+    rowtime match {
+      case Some((idx, _)) if idx >= dataSet.getType.getArity =>
+        throw new ValidationException(
+          "A rowtime attribute must be defined on an existing field in a batch environment.")
+      case _ => // ok
+    }
+
     val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
     registerTableInternal(name, dataSetTable)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index f532c5b..d1f2fb5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.calcite.RelTimeIndicatorConverter
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -87,47 +88,6 @@ abstract class StreamTableEnvironment(
   protected def createUniqueTableName(): String = "_DataStreamTable_" + nameCntr.getAndIncrement()
 
   /**
-    * Returns field names and field positions for a given [[TypeInformation]].
-    *
-    * Field names are automatically extracted for
-    * [[org.apache.flink.api.common.typeutils.CompositeType]].
-    * The method fails if inputType is not a
-    * [[org.apache.flink.api.common.typeutils.CompositeType]].
-    *
-    * @param inputType The TypeInformation extract the field names and positions from.
-    * @tparam A The type of the TypeInformation.
-    * @return A tuple of two arrays holding the field names and corresponding field positions.
-    */
-  override protected[flink] def getFieldInfo[A](inputType: TypeInformation[A])
-    : (Array[String], Array[Int]) = {
-    val fieldInfo = super.getFieldInfo(inputType)
-    if (fieldInfo._1.contains("rowtime")) {
-      throw new TableException("'rowtime' ia a reserved field name in stream environment.")
-    }
-    fieldInfo
-  }
-
-  /**
-    * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
-    * [[Expression]].
-    *
-    * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
-    * @param exprs     The expressions that define the field names.
-    * @tparam A The type of the TypeInformation.
-    * @return A tuple of two arrays holding the field names and corresponding field positions.
-    */
-  override protected[flink] def getFieldInfo[A](
-      inputType: TypeInformation[A],
-      exprs: Array[Expression])
-    : (Array[String], Array[Int]) = {
-    val fieldInfo = super.getFieldInfo(inputType, exprs)
-    if (fieldInfo._1.contains("rowtime")) {
-      throw new TableException("'rowtime' is a reserved field name in stream environment.")
-    }
-    fieldInfo
-  }
-
-  /**
     * Registers an external [[StreamTableSource]] in this [[TableEnvironment]]'s catalog.
     * Registered tables can be referenced in SQL queries.
     *
@@ -145,6 +105,7 @@ abstract class StreamTableEnvironment(
             "StreamTableEnvironment")
     }
   }
+
   /**
     * Writes a [[Table]] to a [[TableSink]].
     *
@@ -185,7 +146,9 @@ abstract class StreamTableEnvironment(
     val dataStreamTable = new DataStreamTable[T](
       dataStream,
       fieldIndexes,
-      fieldNames
+      fieldNames,
+      None,
+      None
     )
     registerTableInternal(name, dataStreamTable)
   }
@@ -200,15 +163,26 @@ abstract class StreamTableEnvironment(
     * @tparam T The type of the [[DataStream]].
     */
   protected def registerDataStreamInternal[T](
-    name: String,
-    dataStream: DataStream[T],
-    fields: Array[Expression]): Unit = {
+      name: String,
+      dataStream: DataStream[T],
+      fields: Array[Expression])
+    : Unit = {
+
+    val (fieldNames, fieldIndexes) = getFieldInfo[T](
+      dataStream.getType,
+      fields,
+      ignoreTimeAttributes = false)
+
+    // validate and extract time attributes
+    val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)
+
 
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields)
     val dataStreamTable = new DataStreamTable[T](
       dataStream,
       fieldIndexes,
-      fieldNames
+      fieldNames,
+      rowtime,
+      proctime
     )
     registerTableInternal(name, dataStreamTable)
   }
@@ -259,7 +233,10 @@ abstract class StreamTableEnvironment(
     // 1. decorrelate
     val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
 
-    // 2. normalize the logical plan
+    // 2. convert time indicators
+    val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)
+
+    // 3. normalize the logical plan
     val normRuleSet = getNormRuleSet
     val normalizedPlan = if (normRuleSet.iterator().hasNext) {
       runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, decorPlan, decorPlan.getTraitSet)
@@ -267,7 +244,7 @@ abstract class StreamTableEnvironment(
       decorPlan
     }
 
-    // 3. optimize the logical Flink plan
+    // 4. optimize the logical Flink plan
     val logicalOptRuleSet = getLogicalOptRuleSet
     val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
     val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
@@ -276,7 +253,7 @@ abstract class StreamTableEnvironment(
       normalizedPlan
     }
 
-    // 4. optimize the physical Flink plan
+    // 5. optimize the physical Flink plan
     val physicalOptRuleSet = getPhysicalOptRuleSet
     val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
     val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
@@ -285,7 +262,7 @@ abstract class StreamTableEnvironment(
       logicalPlan
     }
 
-    // 5. decorate the optimized plan
+    // 6. decorate the optimized plan
     val decoRuleSet = getDecoRuleSet
     val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
       runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, physicalPlan, physicalPlan.getTraitSet)

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 06c405e..4c72e8f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -52,6 +52,9 @@ import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
 import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction, AggregateFunction}
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
 import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
@@ -598,70 +601,94 @@ abstract class TableEnvironment(val config: TableConfig) {
 
   /**
     * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
-    * [[Expression]].
+    * [[Expression]]. It does not handle time attributes but considers them in indices, if
+    * ignore flag is not false.
     *
     * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
     * @param exprs The expressions that define the field names.
+    * @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions.
     * @tparam A The type of the TypeInformation.
     * @return A tuple of two arrays holding the field names and corresponding field positions.
     */
   protected[flink] def getFieldInfo[A](
-    inputType: TypeInformation[A],
-    exprs: Array[Expression]): (Array[String], Array[Int]) = {
+      inputType: TypeInformation[A],
+      exprs: Array[Expression],
+      ignoreTimeAttributes: Boolean)
+    : (Array[String], Array[Int]) = {
 
     TableEnvironment.validateType(inputType)
 
+    val filteredExprs = if (ignoreTimeAttributes) {
+        exprs.map {
+          case ta: TimeAttribute => ta.expression
+          case e@_ => e
+        }
+    } else {
+      exprs
+    }
+
     val indexedNames: Array[(Int, String)] = inputType match {
       case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
         throw new TableException(
           "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
             "Please specify the type of the input with a RowTypeInfo.")
       case a: AtomicType[A] =>
-        if (exprs.length != 1) {
-          throw new TableException("Table of atomic type can only have a single field.")
-        }
-        exprs.map {
-          case UnresolvedFieldReference(name) => (0, name)
+        filteredExprs.zipWithIndex flatMap {
+          case (UnresolvedFieldReference(name), idx) =>
+            if (idx > 0) {
+              throw new TableException("Table of atomic type can only have a single field.")
+            }
+            Some((0, name))
+          case (_: TimeAttribute, _) if ignoreTimeAttributes =>
+            None
           case _ => throw new TableException("Field reference expression requested.")
         }
       case t: TupleTypeInfo[A] =>
-        exprs.zipWithIndex.map {
-          case (UnresolvedFieldReference(name), idx) => (idx, name)
+        filteredExprs.zipWithIndex flatMap {
+          case (UnresolvedFieldReference(name), idx) =>
+            Some((idx, name))
           case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
             val idx = t.getFieldIndex(origName)
             if (idx < 0) {
               throw new TableException(s"$origName is not a field of type $t")
             }
-            (idx, name)
+            Some((idx, name))
+          case (_: TimeAttribute, _) =>
+            None
           case _ => throw new TableException(
             "Field reference expression or alias on field expression expected.")
         }
       case c: CaseClassTypeInfo[A] =>
-        exprs.zipWithIndex.map {
-          case (UnresolvedFieldReference(name), idx) => (idx, name)
+        filteredExprs.zipWithIndex flatMap {
+          case (UnresolvedFieldReference(name), idx) =>
+            Some((idx, name))
           case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
             val idx = c.getFieldIndex(origName)
             if (idx < 0) {
               throw new TableException(s"$origName is not a field of type $c")
             }
-            (idx, name)
+            Some((idx, name))
+          case (_: TimeAttribute, _) =>
+            None
           case _ => throw new TableException(
             "Field reference expression or alias on field expression expected.")
         }
       case p: PojoTypeInfo[A] =>
-        exprs.map {
+        filteredExprs flatMap {
           case (UnresolvedFieldReference(name)) =>
             val idx = p.getFieldIndex(name)
             if (idx < 0) {
               throw new TableException(s"$name is not a field of type $p")
             }
-            (idx, name)
+            Some((idx, name))
           case Alias(UnresolvedFieldReference(origName), name, _) =>
             val idx = p.getFieldIndex(origName)
             if (idx < 0) {
               throw new TableException(s"$origName is not a field of type $p")
             }
-            (idx, name)
+            Some((idx, name))
+          case _: TimeAttribute =>
+            None
           case _ => throw new TableException(
             "Field reference expression or alias on field expression expected.")
         }
@@ -795,6 +822,42 @@ abstract class TableEnvironment(val config: TableConfig) {
     Some(mapFunction)
   }
 
+  /**
+    * Checks for at most one rowtime and proctime attribute.
+    * Returns the time attributes.
+    *
+    * @return rowtime attribute and proctime attribute
+    */
+  protected def validateAndExtractTimeAttributes(
+      fieldNames: Seq[String],
+      fieldIndices: Seq[Int],
+      exprs: Array[Expression])
+    : (Option[(Int, String)], Option[(Int, String)]) = {
+
+    var rowtime: Option[(Int, String)] = None
+    var proctime: Option[(Int, String)] = None
+
+    exprs.zipWithIndex.foreach {
+      case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
+        if (rowtime.isDefined) {
+          throw new TableException(
+            "The rowtime attribute can only be defined once in a table schema.")
+        } else {
+          rowtime = Some(idx, name)
+        }
+      case (ProctimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
+        if (proctime.isDefined) {
+          throw new TableException(
+            "The proctime attribute can only be defined once in a table schema.")
+        } else {
+          proctime = Some(idx, name)
+        }
+      case _ =>
+        // do nothing
+    }
+
+    (rowtime, proctime)
+  }
 }
 
 /**
@@ -803,6 +866,10 @@ abstract class TableEnvironment(val config: TableConfig) {
   */
 object TableEnvironment {
 
+  // default names that can be used in in TableSources etc.
+  val DEFAULT_ROWTIME_ATTRIBUTE = "rowtime"
+  val DEFAULT_PROCTIME_ATTRIBUTE = "proctime"
+
   /**
     * Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]].
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index cc58ff5..6d15212 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -625,7 +625,7 @@ trait ImplicitExpressionOperations {
     */
   def millis = milli
 
-  // row interval type
+  // Row interval type
 
   /**
     * Creates an interval of rows.
@@ -634,6 +634,8 @@ trait ImplicitExpressionOperations {
     */
   def rows = toRowInterval(expr)
 
+  // Advanced type helper functions
+
   /**
     * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and
     * returns it's value.
@@ -680,6 +682,20 @@ trait ImplicitExpressionOperations {
     * @return the first and only element of an array with a single element
     */
   def element() = ArrayElement(expr)
+
+  // Schema definition
+
+  /**
+    * Declares a field as the rowtime attribute for indicating, accessing, and working in
+    * Flink's event time.
+    */
+  def rowtime = RowtimeAttribute(expr)
+
+  /**
+    * Declares a field as the proctime attribute for indicating, accessing, and working in
+    * Flink's processing time.
+    */
+  def proctime = ProctimeAttribute(expr)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
index 5e70440..d0430c2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.table.api.{OverWindowWithOrderBy, SessionWithGap, SlideWithSize, TumbleWithSize}
 import org.apache.flink.table.expressions.Expression
-import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithOrderBy, SlideWithSize, SessionWithGap}
 
 /**
   * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 87dde0a..dd8265b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -20,12 +20,12 @@ package org.apache.flink.table.api
 import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference}
-import org.apache.flink.table.plan.logical.Minus
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.plan.ProjectionTranslator._
-import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.logical.{Minus, _}
 import org.apache.flink.table.sinks.TableSink
 
 import _root_.scala.collection.JavaConverters._
@@ -1015,13 +1015,7 @@ class WindowGroupedTable(
     val projectsOnAgg = replaceAggregationsAndProperties(
       fields, table.tableEnv, aggNames, propNames)
 
-    val projectFields = (table.tableEnv, window) match {
-      // event time can be arbitrary field in batch environment
-      case (_: BatchTableEnvironment, w: EventTimeWindow) =>
-        extractFieldReferences(fields ++ groupKeys ++ Seq(w.timeField))
-      case (_, _) =>
-        extractFieldReferences(fields ++ groupKeys)
-    }
+    val projectFields = extractFieldReferences(fields ++ groupKeys :+ window.timeField)
 
     new Table(table.tableEnv,
       Project(

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
index 80260f7..11ef360 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
@@ -149,7 +149,7 @@ class OverWindowWithOrderBy(
   * A window specification.
   *
   * Window groups rows based on time or row-count intervals. It is a general way to group the
-  * elements, which is very helpful for both groupby-aggregations and over-aggregations to
+  * elements, which is very helpful for both groupBy-aggregations and over-aggregations to
   * compute aggregates on groups of elements.
   *
   * Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping
@@ -157,111 +157,73 @@ class OverWindowWithOrderBy(
   *
   * For finite batch tables, window provides shortcuts for time-based groupBy.
   *
-  * @param alias The expression of alias for this Window
   */
-abstract class Window(val alias: Expression) {
+abstract class Window(val alias: Expression, val timeField: Expression) {
 
   /**
     * Converts an API class to a logical window for planning.
     */
   private[flink] def toLogicalWindow: LogicalWindow
+
 }
 
+// ------------------------------------------------------------------------------------------------
+// Tumbling windows
+// ------------------------------------------------------------------------------------------------
+
 /**
-  * A window specification without alias.
+  * Tumbling window.
+  *
+  * For streaming tables you can specify grouping by a event-time or processing-time attribute.
+  *
+  * For batch tables you can specify grouping on a timestamp or long attribute.
+  *
+  * @param size the size of the window either as time or row-count interval.
   */
-abstract class WindowWithoutAlias {
+class TumbleWithSize(size: Expression) {
 
   /**
-    * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
-    * refer to. `select()` statement can access window properties such as window start or end time.
+    * Tumbling window.
     *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: Expression): Window
-
-  /**
-    * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
-    * refer to. `select()` statement can access window properties such as window start or end time.
+    * For streaming tables you can specify grouping by a event-time or processing-time attribute.
     *
-    * @param alias alias for this window
-    * @return this window
+    * For batch tables you can specify grouping on a timestamp or long attribute.
+    *
+    * @param size the size of the window either as time or row-count interval.
     */
-  def as(alias: String): Window = as(ExpressionParser.parseExpression(alias))
-}
-
-/**
-  * A predefined specification of window on processing-time
-  */
-abstract class ProcTimeWindowWithoutAlias extends WindowWithoutAlias {
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
 
   /**
     * Specifies the time attribute on which rows are grouped.
     *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
-    * are grouped by processing-time.
+    * For streaming tables you can specify grouping by a event-time or processing-time attribute.
     *
-    * For batch tables, refer to a timestamp or long attribute.
+    * For batch tables you can specify grouping on a timestamp or long attribute.
     *
-    * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a predefined window on event-time
+    * @param timeField time attribute for streaming and batch tables
+    * @return a tumbling window on event-time
     */
-  def on(timeField: Expression): WindowWithoutAlias
+  def on(timeField: Expression): TumbleWithSizeOnTime =
+    new TumbleWithSizeOnTime(timeField, size)
 
   /**
     * Specifies the time attribute on which rows are grouped.
     *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
-    * are grouped by processing-time.
+    * For streaming tables you can specify grouping by a event-time or processing-time attribute.
     *
-    * For batch tables, refer to a timestamp or long attribute.
+    * For batch tables you can specify grouping on a timestamp or long attribute.
     *
-    * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a predefined window on event-time
+    * @param timeField time attribute for streaming and batch tables
+    * @return a tumbling window on event-time
     */
-  def on(timeField: String): WindowWithoutAlias =
+  def on(timeField: String): TumbleWithSizeOnTime =
     on(ExpressionParser.parseExpression(timeField))
 }
 
 /**
-  * A window operating on event-time.
-  *
-  * For streaming tables call on('rowtime) to specify grouping by event-time.
-  * Otherwise rows are grouped by processing-time.
-  *
-  * For batch tables, refer to a timestamp or long attribute.
-  *
-  * @param timeField time mode for streaming tables and time attribute for batch tables
+  * Tumbling window on time.
   */
-abstract class EventTimeWindow(alias: Expression, val timeField: Expression) extends Window(alias)
-
-// ------------------------------------------------------------------------------------------------
-// Tumbling windows
-// ------------------------------------------------------------------------------------------------
-
-/**
-  * A partial specification of a tumbling window.
-  *
-  * @param size the size of the window either a time or a row-count interval.
-  */
-class TumbleWithSize(size: Expression) extends ProcTimeWindowWithoutAlias {
-
-  def this(size: String) = this(ExpressionParser.parseExpression(size))
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time.
-    * Otherwise rows are grouped by processing-time.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    *
-    * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a predefined window on event-time
-    */
-  override def on(timeField: Expression): WindowWithoutAlias =
-    new TumbleWithoutAlias(timeField, size)
+class TumbleWithSizeOnTime(time: Expression, size: Expression) {
 
   /**
     * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
@@ -270,15 +232,9 @@ class TumbleWithSize(size: Expression) extends ProcTimeWindowWithoutAlias {
     * @param alias alias for this window
     * @return this window
     */
-  override def as(alias: Expression) = new TumblingWindow(alias, size)
-}
-
-/**
-  * A tumbling window on event-time without alias.
-  */
-class TumbleWithoutAlias(
-    time: Expression,
-    size: Expression) extends WindowWithoutAlias {
+  def as(alias: Expression): TumbleWithSizeOnTimeWithAlias = {
+    new TumbleWithSizeOnTimeWithAlias(alias, time, size)
+  }
 
   /**
     * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
@@ -287,31 +243,28 @@ class TumbleWithoutAlias(
     * @param alias alias for this window
     * @return this window
     */
-  override def as(alias: Expression): Window = new TumblingEventTimeWindow(alias, time, size)
-}
-
-/**
-  * Tumbling window on processing-time.
-  *
-  * @param alias the alias of the window.
-  * @param size the size of the window either a time or a row-count interval.
-  */
-class TumblingWindow(alias: Expression, size: Expression) extends Window(alias) {
-
-  override private[flink] def toLogicalWindow: LogicalWindow =
-    ProcessingTimeTumblingGroupWindow(alias, size)
+  def as(alias: String): TumbleWithSizeOnTimeWithAlias = {
+    as(ExpressionParser.parseExpression(alias))
+  }
 }
 
 /**
-  * Tumbling window on event-time.
+  * Tumbling window on time with alias. Fully specifies a window.
   */
-class TumblingEventTimeWindow(
+class TumbleWithSizeOnTimeWithAlias(
     alias: Expression,
-    time: Expression,
-    size: Expression) extends EventTimeWindow(alias, time) {
+    timeField: Expression,
+    size: Expression)
+  extends Window(
+    alias,
+    timeField) {
 
-  override private[flink] def toLogicalWindow: LogicalWindow =
-    EventTimeTumblingGroupWindow(alias, time, size)
+  /**
+    * Converts an API class to a logical window for planning.
+    */
+  override private[flink] def toLogicalWindow: LogicalWindow = {
+    TumblingGroupWindow(alias, timeField, size)
+  }
 }
 
 // ------------------------------------------------------------------------------------------------
@@ -319,16 +272,16 @@ class TumblingEventTimeWindow(
 // ------------------------------------------------------------------------------------------------
 
 /**
-  * A partially specified sliding window.
+  * Partially specified sliding window.
   *
-  * @param size the size of the window either a time or a row-count interval.
+  * @param size the size of the window either as time or row-count interval.
   */
 class SlideWithSize(size: Expression) {
 
   /**
-    * A partially specified sliding window.
+    * Partially specified sliding window.
     *
-    * @param size the size of the window either a time or a row-count interval.
+    * @param size the size of the window either as time or row-count interval.
     */
   def this(size: String) = this(ExpressionParser.parseExpression(size))
 
@@ -343,9 +296,9 @@ class SlideWithSize(size: Expression) {
     * windows.
     *
     * @param slide the slide of the window either as time or row-count interval.
-    * @return a predefined sliding window.
+    * @return a sliding window
     */
-  def every(slide: Expression): SlideWithSlide = new SlideWithSlide(size, slide)
+  def every(slide: Expression): SlideWithSizeAndSlide = new SlideWithSizeAndSlide(size, slide)
 
   /**
     * Specifies the window's slide as time or row-count interval.
@@ -358,48 +311,54 @@ class SlideWithSize(size: Expression) {
     * windows.
     *
     * @param slide the slide of the window either as time or row-count interval.
-    * @return a predefined sliding window.
+    * @return a sliding window
     */
-  def every(slide: String): WindowWithoutAlias = every(ExpressionParser.parseExpression(slide))
+  def every(slide: String): SlideWithSizeAndSlide = every(ExpressionParser.parseExpression(slide))
 }
 
 /**
-  * A partially defined sliding window.
+  * Sliding window.
+  *
+  * For streaming tables you can specify grouping by a event-time or processing-time attribute.
+  *
+  * For batch tables you can specify grouping on a timestamp or long attribute.
+  *
+  * @param size the size of the window either as time or row-count interval.
   */
-class SlideWithSlide(
-    size: Expression,
-    slide: Expression) extends ProcTimeWindowWithoutAlias {
+class SlideWithSizeAndSlide(size: Expression, slide: Expression) {
+
   /**
     * Specifies the time attribute on which rows are grouped.
     *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
-    * are grouped by processing-time.
+    * For streaming tables you can specify grouping by a event-time or processing-time attribute.
     *
-    * For batch tables, refer to a timestamp or long attribute.
+    * For batch tables you can specify grouping on a timestamp or long attribute.
     *
-    * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a predefined Sliding window on event-time.
+    * @param timeField time attribute for streaming and batch tables
+    * @return a tumbling window on event-time
     */
-  override def on(timeField: Expression): SlideWithoutAlias =
-    new SlideWithoutAlias(timeField, size, slide)
+  def on(timeField: Expression): SlideWithSizeAndSlideOnTime =
+    new SlideWithSizeAndSlideOnTime(timeField, size, slide)
 
   /**
-    * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
-    * refer to. `select()` statement can access window properties such as window start or end time.
+    * Specifies the time attribute on which rows are grouped.
     *
-    * @param alias alias for this window
-    * @return this window
+    * For streaming tables you can specify grouping by a event-time or processing-time attribute.
+    *
+    * For batch tables you can specify grouping on a timestamp or long attribute.
+    *
+    * @param timeField time attribute for streaming and batch tables
+    * @return a tumbling window on event-time
     */
-  override def as(alias: Expression): Window = new SlidingWindow(alias, size, slide)
+  def on(timeField: String): SlideWithSizeAndSlideOnTime =
+    on(ExpressionParser.parseExpression(timeField))
 }
 
 /**
-  * A partially defined sliding window on event-time without alias.
+  * Sliding window on time.
   */
-class SlideWithoutAlias(
-    timeField: Expression,
-    size: Expression,
-    slide: Expression) extends WindowWithoutAlias {
+class SlideWithSizeAndSlideOnTime(timeField: Expression, size: Expression, slide: Expression) {
+
   /**
     * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
     * refer to. `select()` statement can access window properties such as window start or end time.
@@ -407,39 +366,40 @@ class SlideWithoutAlias(
     * @param alias alias for this window
     * @return this window
     */
-  override def as(alias: Expression): Window =
-    new SlidingEventTimeWindow(alias, timeField, size, slide)
-}
+  def as(alias: Expression): SlideWithSizeAndSlideOnTimeWithAlias = {
+    new SlideWithSizeAndSlideOnTimeWithAlias(alias, timeField, size, slide)
+  }
 
-/**
-  * A sliding window on processing-time.
-  *
-  * @param alias the alias of the window.
-  * @param size the size of the window either a time or a row-count interval.
-  * @param slide the interval by which the window slides.
-  */
-class SlidingWindow(
-  alias: Expression,
-  size: Expression,
-  slide: Expression)
-  extends Window(alias) {
-
-  override private[flink] def toLogicalWindow: LogicalWindow =
-    ProcessingTimeSlidingGroupWindow(alias, size, slide)
+  /**
+    * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
+    * refer to. `select()` statement can access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: String): SlideWithSizeAndSlideOnTimeWithAlias = {
+    as(ExpressionParser.parseExpression(alias))
+  }
 }
 
 /**
-  * A sliding window on event-time.
+  * Sliding window on time with alias. Fully specifies a window.
   */
-class SlidingEventTimeWindow(
+class SlideWithSizeAndSlideOnTimeWithAlias(
     alias: Expression,
     timeField: Expression,
     size: Expression,
     slide: Expression)
-  extends EventTimeWindow(alias, timeField) {
+  extends Window(
+    alias,
+    timeField) {
 
-  override private[flink] def toLogicalWindow: LogicalWindow =
-    EventTimeSlidingGroupWindow(alias, timeField, size, slide)
+  /**
+    * Converts an API class to a logical window for planning.
+    */
+  override private[flink] def toLogicalWindow: LogicalWindow = {
+    SlidingGroupWindow(alias, timeField, size, slide)
+  }
 }
 
 // ------------------------------------------------------------------------------------------------
@@ -447,42 +407,59 @@ class SlidingEventTimeWindow(
 // ------------------------------------------------------------------------------------------------
 
 /**
-  * A partially defined session window.
+  * Session window.
+  *
+  * For streaming tables you can specify grouping by a event-time or processing-time attribute.
+  *
+  * For batch tables you can specify grouping on a timestamp or long attribute.
+  *
+  * @param gap the time interval of inactivity before a window is closed.
   */
-class SessionWithGap(gap: Expression) extends ProcTimeWindowWithoutAlias {
+class SessionWithGap(gap: Expression) {
 
+  /**
+    * Session window.
+    *
+    * For streaming tables you can specify grouping by a event-time or processing-time attribute.
+    *
+    * For batch tables you can specify grouping on a timestamp or long attribute.
+    *
+    * @param gap the time interval of inactivity before a window is closed.
+    */
   def this(gap: String) = this(ExpressionParser.parseExpression(gap))
 
   /**
     * Specifies the time attribute on which rows are grouped.
     *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
-    * are grouped by processing-time.
+    * For streaming tables you can specify grouping by a event-time or processing-time attribute.
     *
-    * For batch tables, refer to a timestamp or long attribute.
+    * For batch tables you can specify grouping on a timestamp or long attribute.
     *
-    * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return an on event-time session window on event-time
+    * @param timeField time attribute for streaming and batch tables
+    * @return a tumbling window on event-time
     */
-  override def on(timeField: Expression): SessionWithoutAlias =
-    new SessionWithoutAlias(timeField, gap)
+  def on(timeField: Expression): SessionWithGapOnTime =
+    new SessionWithGapOnTime(timeField, gap)
 
   /**
-    * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
-    * refer to. `select()` statement can access window properties such as window start or end time.
+    * Specifies the time attribute on which rows are grouped.
     *
-    * @param alias alias for this window
-    * @return this window
+    * For streaming tables you can specify grouping by a event-time or processing-time attribute.
+    *
+    * For batch tables you can specify grouping on a timestamp or long attribute.
+    *
+    * @param timeField time attribute for streaming and batch tables
+    * @return a tumbling window on event-time
     */
-  override def as(alias: Expression): Window = new SessionWindow(alias, gap)
+  def on(timeField: String): SessionWithGapOnTime =
+    on(ExpressionParser.parseExpression(timeField))
 }
 
 /**
-  * A partially defined session window on event-time without alias.
+  * Session window on time.
   */
-class SessionWithoutAlias(
-    timeField: Expression,
-    gap: Expression) extends WindowWithoutAlias {
+class SessionWithGapOnTime(timeField: Expression, gap: Expression) {
+
   /**
     * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
     * refer to. `select()` statement can access window properties such as window start or end time.
@@ -490,29 +467,37 @@ class SessionWithoutAlias(
     * @param alias alias for this window
     * @return this window
     */
-  override def as(alias: Expression): Window = new SessionEventTimeWindow(alias, timeField, gap)
-}
-
-/**
-  * A session window on processing-time.
-  *
-  * @param gap the time interval of inactivity before a window is closed.
-  */
-class SessionWindow(alias: Expression, gap: Expression) extends Window(alias) {
+  def as(alias: Expression): SessionWithGapOnTimeWithAlias = {
+    new SessionWithGapOnTimeWithAlias(alias, timeField, gap)
+  }
 
-  override private[flink] def toLogicalWindow: LogicalWindow =
-    ProcessingTimeSessionGroupWindow(alias, gap)
+  /**
+    * Assigns an alias for this window that the following `groupBy()` and `select()` clause can
+    * refer to. `select()` statement can access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: String): SessionWithGapOnTimeWithAlias = {
+    as(ExpressionParser.parseExpression(alias))
+  }
 }
 
 /**
-  * A session window on event-time.
+  * Session window on time with alias. Fully specifies a window.
   */
-class SessionEventTimeWindow(
+class SessionWithGapOnTimeWithAlias(
     alias: Expression,
     timeField: Expression,
     gap: Expression)
-  extends EventTimeWindow(alias, timeField) {
+  extends Window(
+    alias,
+    timeField) {
 
-  override private[flink] def toLogicalWindow: LogicalWindow =
-    EventTimeSessionGroupWindow(alias, timeField, gap)
+  /**
+    * Converts an API class to a logical window for planning.
+    */
+  override private[flink] def toLogicalWindow: LogicalWindow = {
+    SessionGroupWindow(alias, timeField, gap)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
index b4a3c42..2bdf360 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
@@ -21,8 +21,8 @@ package org.apache.flink.table.calcite
 import org.apache.calcite.adapter.java.JavaTypeFactory
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.validate.{SqlConformance, SqlValidatorImpl}
-import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable}
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidatorImpl}
 
 /**
  * This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]].
@@ -30,8 +30,12 @@ import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable}
 class FlinkCalciteSqlValidator(
     opTab: SqlOperatorTable,
     catalogReader: CalciteCatalogReader,
-    typeFactory: JavaTypeFactory) extends SqlValidatorImpl(
-        opTab, catalogReader, typeFactory, SqlConformance.DEFAULT) {
+    factory: JavaTypeFactory)
+  extends SqlValidatorImpl(
+    opTab,
+    catalogReader,
+    factory,
+    SqlConformanceEnum.DEFAULT) {
 
   override def getLogicalSourceRowType(
       sourceRowType: RelDataType,

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index 09e3277..beb2436 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -107,7 +107,11 @@ class FlinkPlannerImpl(
       // we disable automatic flattening in order to let composite types pass without modification
       // we might enable it again once Calcite has better support for structured types
       // root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
-      root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+
+      // TableEnvironment.optimize will execute the following
+      // root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+      // convert time indicators
+      // root = root.withRel(RelTimeIndicatorConverter.convert(root.rel, rexBuilder))
       root
     } catch {
       case e: RelConversionException => throw TableException(e.getMessage)

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 7762ff8..001011b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -20,25 +20,25 @@ package org.apache.flink.table.calcite
 
 import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
+import org.apache.calcite.rel.`type`._
 import org.apache.calcite.sql.SqlIntervalQualifier
-import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
 import org.apache.flink.api.java.typeutils.ValueTypeInfo._
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.plan.schema.{ArrayRelDataType, CompositeRelDataType, GenericRelDataType, MapRelDataType}
-import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
 import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
+import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TimeIntervalTypeInfo}
 import org.apache.flink.types.Row
 
-import scala.collection.mutable
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
   * Flink specific type factory that represents the interface between Flink's [[TypeInformation]]
@@ -65,6 +65,12 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
           createSqlIntervalType(
             new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
 
+        case TimeIndicatorTypeInfo.ROWTIME_INDICATOR =>
+          createRowtimeIndicatorType()
+
+        case TimeIndicatorTypeInfo.PROCTIME_INDICATOR =>
+          createProctimeIndicatorType()
+
         case _ =>
           createSqlType(sqlType)
       }
@@ -77,23 +83,75 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
   }
 
   /**
+    * Creates a indicator type for processing-time, but with similar properties as SQL timestamp.
+    */
+  def createProctimeIndicatorType(): RelDataType = {
+    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+    canonize(
+      new TimeIndicatorRelDataType(
+        getTypeSystem,
+        originalType.asInstanceOf[BasicSqlType],
+        isEventTime = false)
+    )
+  }
+
+  /**
+    * Creates a indicator type for event-time, but with similar properties as SQL timestamp.
+    */
+  def createRowtimeIndicatorType(): RelDataType = {
+    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+    canonize(
+      new TimeIndicatorRelDataType(
+        getTypeSystem,
+        originalType.asInstanceOf[BasicSqlType],
+        isEventTime = true)
+    )
+  }
+
+  /**
     * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
     *
     * @param fieldNames field names
     * @param fieldTypes field types, every element is Flink's [[TypeInformation]]
-    * @return a struct type with the input fieldNames and input fieldTypes
+    * @param rowtime optional system field to indicate event-time; the index determines the index
+    *                in the final record and might replace an existing field
+    * @param proctime optional system field to indicate processing-time; the index determines the
+    *                 index in the final record and might replace an existing field
+    * @return a struct type with the input fieldNames, input fieldTypes, and system fields
     */
-  def buildRowDataType(
-      fieldNames: Array[String],
-      fieldTypes: Array[TypeInformation[_]])
+  def buildLogicalRowType(
+      fieldNames: Seq[String],
+      fieldTypes: Seq[TypeInformation[_]],
+      rowtime: Option[(Int, String)],
+      proctime: Option[(Int, String)])
     : RelDataType = {
-    val rowDataTypeBuilder = builder
-    fieldNames
-      .zip(fieldTypes)
-      .foreach { f =>
-        rowDataTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2)).nullable(true)
+    val logicalRowTypeBuilder = builder
+
+    val fields = fieldNames.zip(fieldTypes)
+
+    var totalNumberOfFields = fields.length
+    if (rowtime.isDefined) {
+      totalNumberOfFields += 1
+    }
+    if (proctime.isDefined) {
+      totalNumberOfFields += 1
+    }
+
+    var addedTimeAttributes = 0
+    for (i <- 0 until totalNumberOfFields) {
+      if (rowtime.isDefined && rowtime.get._1 == i) {
+        logicalRowTypeBuilder.add(rowtime.get._2, createRowtimeIndicatorType())
+        addedTimeAttributes += 1
+      } else if (proctime.isDefined && proctime.get._1 == i) {
+        logicalRowTypeBuilder.add(proctime.get._2, createProctimeIndicatorType())
+        addedTimeAttributes += 1
+      } else {
+        val field = fields(i - addedTimeAttributes)
+        logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2)).nullable(true)
       }
-    rowDataTypeBuilder.build
+    }
+
+    logicalRowTypeBuilder.build
   }
 
   override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
@@ -178,6 +236,7 @@ object FlinkTypeFactory {
   /**
     * Converts a Calcite logical record into a Flink type information.
     */
+  @deprecated("Use the RowSchema class instead because it handles both logical and physical rows.")
   def toInternalRowTypeInfo(logicalRowType: RelDataType): TypeInformation[Row] = {
     // convert to type information
     val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType =>
@@ -188,6 +247,36 @@ object FlinkTypeFactory {
     new RowTypeInfo(logicalFieldTypes.toArray, logicalFieldNames.toArray)
   }
 
+  def isProctimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match {
+    case ti: TimeIndicatorRelDataType if !ti.isEventTime => true
+    case _ => false
+  }
+
+  def isProctimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+    case ti: TimeIndicatorTypeInfo if !ti.isEventTime => true
+    case _ => false
+  }
+
+  def isRowtimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match {
+    case ti: TimeIndicatorRelDataType if ti.isEventTime => true
+    case _ => false
+  }
+
+  def isRowtimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+    case ti: TimeIndicatorTypeInfo if ti.isEventTime => true
+    case _ => false
+  }
+
+  def isTimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match {
+    case ti: TimeIndicatorRelDataType => true
+    case _ => false
+  }
+
+  def isTimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+    case ti: TimeIndicatorTypeInfo => true
+    case _ => false
+  }
+
   def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match {
     case BOOLEAN => BOOLEAN_TYPE_INFO
     case TINYINT => BYTE_TYPE_INFO
@@ -199,6 +288,15 @@ object FlinkTypeFactory {
     case VARCHAR | CHAR => STRING_TYPE_INFO
     case DECIMAL => BIG_DEC_TYPE_INFO
 
+    // time indicators
+    case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
+      val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
+      if (indicator.isEventTime) {
+        TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+      } else {
+        TimeIndicatorTypeInfo.PROCTIME_INDICATOR
+      }
+
     // temporal types
     case DATE => SqlTimeTypeInfo.DATE
     case TIME => SqlTimeTypeInfo.TIME

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
new file mode 100644
index 0000000..fa2e3ee
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl, RelRecordType, StructKind}
+import org.apache.calcite.rel.logical._
+import org.apache.calcite.rel.{RelNode, RelShuttleImpl}
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.calcite.FlinkTypeFactory.isTimeIndicatorType
+import org.apache.flink.table.functions.TimeMaterializationSqlFunction
+import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+
+import scala.collection.JavaConversions._
+
+/**
+  * Traverses a [[RelNode]] tree and converts fields with [[TimeIndicatorRelDataType]] type. If a
+  * time attribute is accessed for a calculation, it will be materialized. Forwarding is allowed in
+  * some cases, but not all.
+  */
+class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttleImpl {
+
+  override def visit(project: LogicalProject): RelNode = {
+    // visit children and update inputs
+    val updatedProject = super.visit(project).asInstanceOf[LogicalProject]
+
+    // check if input field contains time indicator type
+    // materialize field if no time indicator is present anymore
+    // if input field is already materialized, change to timestamp type
+    val materializer = new RexTimeIndicatorMaterializer(
+      rexBuilder,
+      updatedProject.getInput.getRowType.getFieldList.map(_.getType))
+    val newProjects = updatedProject.getProjects.map(_.accept(materializer))
+
+    // copy project
+    updatedProject.copy(
+      updatedProject.getTraitSet,
+      updatedProject.getInput,
+      newProjects,
+      buildRowType(updatedProject.getRowType.getFieldNames, newProjects.map(_.getType))
+    )
+  }
+
+  override def visit(filter: LogicalFilter): RelNode = {
+    // visit children and update inputs
+    val updatedFilter = super.visit(filter).asInstanceOf[LogicalFilter]
+
+    // check if input field contains time indicator type
+    // materialize field if no time indicator is present anymore
+    // if input field is already materialized, change to timestamp type
+    val materializer = new RexTimeIndicatorMaterializer(
+      rexBuilder,
+      updatedFilter.getInput.getRowType.getFieldList.map(_.getType))
+    val newCondition = updatedFilter.getCondition.accept(materializer)
+
+    // copy filter
+    updatedFilter.copy(
+      updatedFilter.getTraitSet,
+      updatedFilter.getInput,
+      newCondition
+    )
+  }
+
+  override def visit(union: LogicalUnion): RelNode = {
+    // visit children and update inputs
+    val updatedUnion = super.visit(union).asInstanceOf[LogicalUnion]
+
+    // make sure that time indicator types match
+    val inputTypes = updatedUnion.getInputs.map(_.getRowType)
+
+    val head = inputTypes.head.getFieldList.map(_.getType)
+
+    val isValid = inputTypes.forall { t =>
+      val fieldTypes = t.getFieldList.map(_.getType)
+
+      fieldTypes.zip(head).forall { case (l, r) =>
+        // check if time indicators match
+        if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) {
+          val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime
+          val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime
+          leftTime == rightTime
+        }
+        // one side is not an indicator
+        else if (isTimeIndicatorType(l) || isTimeIndicatorType(r)) {
+          false
+        }
+        // uninteresting types
+        else {
+          true
+        }
+      }
+    }
+
+    if (!isValid) {
+      throw new ValidationException(
+        "Union fields with time attributes have different types.")
+    }
+
+    updatedUnion
+  }
+
+  override def visit(other: RelNode): RelNode = other match {
+    case scan: LogicalTableFunctionScan if
+        stack.size() > 0 && stack.peek().isInstanceOf[LogicalCorrelate] =>
+      // visit children and update inputs
+      val updatedScan = super.visit(scan).asInstanceOf[LogicalTableFunctionScan]
+
+      val correlate = stack.peek().asInstanceOf[LogicalCorrelate]
+
+      // check if input field contains time indicator type
+      // materialize field if no time indicator is present anymore
+      // if input field is already materialized, change to timestamp type
+      val materializer = new RexTimeIndicatorMaterializer(
+        rexBuilder,
+        correlate.getInputs.get(0).getRowType.getFieldList.map(_.getType))
+      val newCall = updatedScan.getCall.accept(materializer)
+
+      // copy scan
+      updatedScan.copy(
+        updatedScan.getTraitSet,
+        updatedScan.getInputs,
+        newCall,
+        updatedScan.getElementType,
+        updatedScan.getRowType,
+        updatedScan.getColumnMappings
+      )
+
+    case _ =>
+      super.visit(other)
+  }
+
+  private def buildRowType(names: Seq[String], types: Seq[RelDataType]): RelDataType = {
+    val fields = names.zipWithIndex.map { case (name, idx) =>
+      new RelDataTypeFieldImpl(name, idx, types(idx))
+    }
+    new RelRecordType(StructKind.FULLY_QUALIFIED, fields)
+  }
+}
+
+class RexTimeIndicatorMaterializer(
+    private val rexBuilder: RexBuilder,
+    private val input: Seq[RelDataType])
+  extends RexShuttle {
+
+  val timestamp = rexBuilder
+    .getTypeFactory
+    .asInstanceOf[FlinkTypeFactory]
+    .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+
+  override def visitInputRef(inputRef: RexInputRef): RexNode = {
+    // reference is interesting
+    if (isTimeIndicatorType(inputRef.getType)) {
+      val resolvedRefType = input(inputRef.getIndex)
+      // input is a valid time indicator
+      if (isTimeIndicatorType(resolvedRefType)) {
+        inputRef
+      }
+      // input has been materialized
+      else {
+        new RexInputRef(inputRef.getIndex, resolvedRefType)
+      }
+    }
+    // reference is a regular field
+    else {
+      super.visitInputRef(inputRef)
+    }
+  }
+
+  override def visitCall(call: RexCall): RexNode = {
+    val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
+
+    // skip materialization for special operators
+    updatedCall.getOperator match {
+      case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
+        return updatedCall
+
+      case _ => // do nothing
+    }
+
+    // materialize operands with time indicators
+    val materializedOperands = updatedCall.getOperands.map { o =>
+      if (isTimeIndicatorType(o.getType)) {
+        rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
+      } else {
+        o
+      }
+    }
+
+    // remove time indicator return type
+    if (isTimeIndicatorType(updatedCall.getType)) {
+      updatedCall.clone(timestamp, materializedOperands)
+    } else {
+      updatedCall.clone(updatedCall.getType, materializedOperands)
+    }
+  }
+}
+
+object RelTimeIndicatorConverter {
+
+  def convert(rootRel: RelNode, rexBuilder: RexBuilder): RelNode = {
+    val converter = new RelTimeIndicatorConverter(rexBuilder)
+    rootRel.accept(converter)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 5bb3b0e..25addbc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -23,7 +23,6 @@ import java.lang.{Iterable => JIterable}
 import java.math.{BigDecimal => JBigDecimal}
 
 import org.apache.calcite.avatica.util.DateTimeUtils
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.`type`.SqlTypeName._
@@ -42,8 +41,8 @@ import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
 import org.apache.flink.table.codegen.Indenter.toISC
 import org.apache.flink.table.codegen.calls.FunctionGenerator
 import org.apache.flink.table.codegen.calls.ScalarOperators._
-import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, UserDefinedFunction}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction}
 import org.apache.flink.table.runtime.TableFunctionCollector
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.apache.flink.types.Row
@@ -59,19 +58,18 @@ import scala.collection.mutable
   * @param nullableInput input(s) can be null.
   * @param input1 type information about the first input of the Function
   * @param input2 type information about the second input if the Function is binary
-  * @param input1PojoFieldMapping additional mapping information if input1 is a POJO (POJO types
-  *                              have no deterministic field order).
-  * @param input2PojoFieldMapping additional mapping information if input2 is a POJO (POJO types
-  *                              have no deterministic field order).
-  *
+  * @param input1FieldMapping additional mapping information for input1
+  *   (e.g. POJO types have no deterministic field order and some input fields might not be read)
+  * @param input2FieldMapping additional mapping information for input2
+  *   (e.g. POJO types have no deterministic field order and some input fields might not be read)
   */
 class CodeGenerator(
-   config: TableConfig,
-   nullableInput: Boolean,
-   input1: TypeInformation[_ <: Any],
-   input2: Option[TypeInformation[_ <: Any]] = None,
-   input1PojoFieldMapping: Option[Array[Int]] = None,
-   input2PojoFieldMapping: Option[Array[Int]] = None)
+    config: TableConfig,
+    nullableInput: Boolean,
+    input1: TypeInformation[_ <: Any],
+    input2: Option[TypeInformation[_ <: Any]] = None,
+    input1FieldMapping: Option[Array[Int]] = None,
+    input2FieldMapping: Option[Array[Int]] = None)
   extends RexVisitor[GeneratedExpression] {
 
   // check if nullCheck is enabled when inputs can be null
@@ -82,7 +80,7 @@ class CodeGenerator(
   // check for POJO input1 mapping
   input1 match {
     case pt: PojoTypeInfo[_] =>
-      input1PojoFieldMapping.getOrElse(
+      input1FieldMapping.getOrElse(
         throw new CodeGenException("No input mapping is specified for input1 of type POJO."))
     case _ => // ok
   }
@@ -90,11 +88,24 @@ class CodeGenerator(
   // check for POJO input2 mapping
   input2 match {
     case Some(pt: PojoTypeInfo[_]) =>
-      input2PojoFieldMapping.getOrElse(
+      input2FieldMapping.getOrElse(
         throw new CodeGenException("No input mapping is specified for input2 of type POJO."))
     case _ => // ok
   }
 
+  private val input1Mapping = input1FieldMapping match {
+    case Some(mapping) => mapping
+    case _ => (0 until input1.getArity).toArray
+  }
+
+  private val input2Mapping = input2FieldMapping match {
+    case Some(mapping) => mapping
+    case _ => input2 match {
+      case Some(input) => (0 until input.getArity).toArray
+      case _ => Array[Int]()
+    }
+  }
+
   /**
     * A code generator for generating unary Flink
     * [[org.apache.flink.api.common.functions.Function]]s with one input.
@@ -102,15 +113,15 @@ class CodeGenerator(
     * @param config configuration that determines runtime behavior
     * @param nullableInput input(s) can be null.
     * @param input type information about the input of the Function
-    * @param inputPojoFieldMapping additional mapping information necessary if input is a
-    *                              POJO (POJO types have no deterministic field order).
+    * @param inputFieldMapping additional mapping information necessary for input
+    *   (e.g. POJO types have no deterministic field order and some input fields might not be read)
     */
   def this(
       config: TableConfig,
       nullableInput: Boolean,
       input: TypeInformation[Any],
-      inputPojoFieldMapping: Array[Int]) =
-    this(config, nullableInput, input, None, Some(inputPojoFieldMapping))
+      inputFieldMapping: Array[Int]) =
+    this(config, nullableInput, input, None, Some(inputFieldMapping))
 
   /**
     * A code generator for generating Flink input formats.
@@ -249,7 +260,7 @@ class CodeGenerator(
     * @param name        Class name of the function.
     *                    Does not need to be unique but has to be a valid Java class identifier.
     * @param generator   The code generator instance
-    * @param inputType   Input row type
+    * @param physicalInputTypes Physical input row types
     * @param aggregates  All aggregate functions
     * @param aggFields   Indexes of the input fields for all aggregate functions
     * @param aggMapping  The mapping of aggregates to output fields
@@ -270,7 +281,7 @@ class CodeGenerator(
   def generateAggregations(
       name: String,
       generator: CodeGenerator,
-      inputType: RelDataType,
+      physicalInputTypes: Seq[TypeInformation[_]],
       aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]],
       aggFields: Array[Array[Int]],
       aggMapping: Array[Int],
@@ -295,8 +306,7 @@ class CodeGenerator(
     val accTypes = accTypeClasses.map(_.getCanonicalName)
 
     // get java classes of input fields
-    val javaClasses = inputType.getFieldList
-      .map(f => FlinkTypeFactory.toTypeInfo(f.getType).getTypeClass)
+    val javaClasses = physicalInputTypes.map(t => t.getTypeClass)
     // get parameter lists for aggregation functions
     val parameters = aggFields.map { inFields =>
       val fields = for (f <- inFields) yield
@@ -844,12 +854,12 @@ class CodeGenerator(
       returnType: TypeInformation[_ <: Any],
       resultFieldNames: Seq[String])
     : GeneratedExpression = {
-    val input1AccessExprs = for (i <- 0 until input1.getArity)
-      yield generateInputAccess(input1, input1Term, i, input1PojoFieldMapping)
+    val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+      yield generateInputAccess(input1, input1Term, i, input1Mapping)
 
     val input2AccessExprs = input2 match {
-      case Some(ti) => for (i <- 0 until ti.getArity)
-        yield generateInputAccess(ti, input2Term, i, input2PojoFieldMapping)
+      case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i))
+        yield generateInputAccess(ti, input2Term, i, input2Mapping)
       case None => Seq() // add nothing
     }
 
@@ -861,14 +871,14 @@ class CodeGenerator(
     */
   def generateCorrelateAccessExprs: (Seq[GeneratedExpression], Seq[GeneratedExpression]) = {
     val input1AccessExprs = for (i <- 0 until input1.getArity)
-      yield generateInputAccess(input1, input1Term, i, input1PojoFieldMapping)
+      yield generateInputAccess(input1, input1Term, i, input1Mapping)
 
     val input2AccessExprs = input2 match {
-      case Some(ti) => for (i <- 0 until ti.getArity)
+      case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i))
         // use generateFieldAccess instead of generateInputAccess to avoid the generated table
         // function's field access code is put on the top of function body rather than
         // the while loop
-        yield generateFieldAccess(ti, input2Term, i, input2PojoFieldMapping)
+        yield generateFieldAccess(ti, input2Term, i, input2Mapping)
       case None => throw new CodeGenException("Type information of input2 must not be null.")
     }
     (input1AccessExprs, input2AccessExprs)
@@ -1123,11 +1133,11 @@ class CodeGenerator(
   override def visitInputRef(inputRef: RexInputRef): GeneratedExpression = {
     // if inputRef index is within size of input1 we work with input1, input2 otherwise
     val input = if (inputRef.getIndex < input1.getArity) {
-      (input1, input1Term, input1PojoFieldMapping)
+      (input1, input1Term, input1Mapping)
     } else {
       (input2.getOrElse(throw new CodeGenException("Invalid input access.")),
         input2Term,
-        input2PojoFieldMapping)
+        input2Mapping)
     }
 
     val index = if (input._2 == input1Term) {
@@ -1146,7 +1156,7 @@ class CodeGenerator(
       refExpr.resultType,
       refExpr.resultTerm,
       index,
-      input1PojoFieldMapping)
+      input1Mapping)
 
     val resultTerm = newName("result")
     val nullTerm = newName("isNull")
@@ -1302,6 +1312,11 @@ class CodeGenerator(
     throw new CodeGenException("Dynamic parameter references are not supported yet.")
 
   override def visitCall(call: RexCall): GeneratedExpression = {
+    // time materialization is not implemented yet
+    if (call.getOperator == TimeMaterializationSqlFunction) {
+      throw new CodeGenException("Access to time attributes is not possible yet.")
+    }
+
     val operands = call.getOperands.map(_.accept(this))
     val resultType = FlinkTypeFactory.toTypeInfo(call.getType)
 
@@ -1546,7 +1561,7 @@ class CodeGenerator(
       inputType: TypeInformation[_ <: Any],
       inputTerm: String,
       index: Int,
-      pojoFieldMapping: Option[Array[Int]])
+      fieldMapping: Array[Int])
     : GeneratedExpression = {
     // if input has been used before, we can reuse the code that
     // has already been generated
@@ -1558,9 +1573,9 @@ class CodeGenerator(
       // generate input access and unboxing if necessary
       case None =>
         val expr = if (nullableInput) {
-          generateNullableInputFieldAccess(inputType, inputTerm, index, pojoFieldMapping)
+          generateNullableInputFieldAccess(inputType, inputTerm, index, fieldMapping)
         } else {
-          generateFieldAccess(inputType, inputTerm, index, pojoFieldMapping)
+          generateFieldAccess(inputType, inputTerm, index, fieldMapping)
         }
 
         reusableInputUnboxingExprs((inputTerm, index)) = expr
@@ -1574,7 +1589,7 @@ class CodeGenerator(
       inputType: TypeInformation[_ <: Any],
       inputTerm: String,
       index: Int,
-      pojoFieldMapping: Option[Array[Int]])
+      fieldMapping: Array[Int])
     : GeneratedExpression = {
     val resultTerm = newName("result")
     val nullTerm = newName("isNull")
@@ -1582,7 +1597,7 @@ class CodeGenerator(
     val fieldType = inputType match {
       case ct: CompositeType[_] =>
         val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]]) {
-          pojoFieldMapping.get(index)
+          fieldMapping(index)
         }
         else {
           index
@@ -1593,7 +1608,7 @@ class CodeGenerator(
     }
     val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType)
     val defaultValue = primitiveDefaultValue(fieldType)
-    val fieldAccessExpr = generateFieldAccess(inputType, inputTerm, index, pojoFieldMapping)
+    val fieldAccessExpr = generateFieldAccess(inputType, inputTerm, index, fieldMapping)
 
     val inputCheckCode =
       s"""
@@ -1617,12 +1632,12 @@ class CodeGenerator(
       inputType: TypeInformation[_],
       inputTerm: String,
       index: Int,
-      pojoFieldMapping: Option[Array[Int]])
+      fieldMapping: Array[Int])
     : GeneratedExpression = {
     inputType match {
       case ct: CompositeType[_] =>
-        val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]] && pojoFieldMapping.nonEmpty) {
-          pojoFieldMapping.get(index)
+        val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]]) {
+          fieldMapping(index)
         }
         else {
           index

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index d9f394b..64280c8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
-import org.apache.flink.table.functions.{EventTimeExtractor, ProcTimeExtractor}
 import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction}
 
 import scala.collection.mutable
@@ -496,15 +495,6 @@ object FunctionGenerator {
         )
       )
 
-    // generate a constant for time indicator functions.
-    // this is a temporary solution and will be removed when FLINK-5884 is implemented.
-    case ProcTimeExtractor | EventTimeExtractor =>
-      Some(new CallGenerator {
-        override def generate(codeGenerator: CodeGenerator, operands: Seq[GeneratedExpression]) = {
-          GeneratedExpression("0L", "false", "", SqlTimeTypeInfo.TIMESTAMP)
-        }
-      })
-
     // built-in scalar function
     case _ =>
       sqlFunctions.get((sqlOperator, operandTypes))

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
index 4b5781f..08abc8f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
@@ -28,10 +28,49 @@ import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
 
 object ExpressionUtils {
 
+  private[flink] def isTimeIntervalLiteral(expr: Expression): Boolean = expr match {
+    case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true
+    case _ => false
+  }
+
+  private[flink] def isRowCountLiteral(expr: Expression): Boolean = expr match {
+    case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) => true
+    case _ => false
+  }
+
+  private[flink] def isTimeAttribute(expr: Expression): Boolean = expr match {
+    case r: ResolvedFieldReference if FlinkTypeFactory.isTimeIndicatorType(r.resultType) => true
+    case _ => false
+  }
+
+  private[flink] def isRowtimeAttribute(expr: Expression): Boolean = expr match {
+    case r: ResolvedFieldReference if FlinkTypeFactory.isRowtimeIndicatorType(r.resultType) => true
+    case _ => false
+  }
+
+  private[flink] def isProctimeAttribute(expr: Expression): Boolean = expr match {
+    case r: ResolvedFieldReference if FlinkTypeFactory.isProctimeIndicatorType(r.resultType) =>
+      true
+    case _ => false
+  }
+
+  private[flink] def toTime(expr: Expression): FlinkTime = expr match {
+    case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+      FlinkTime.milliseconds(value)
+    case _ => throw new IllegalArgumentException()
+  }
+
+  private[flink] def toLong(expr: Expression): Long = expr match {
+    case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
+    case _ => throw new IllegalArgumentException()
+  }
+
   private[flink] def toMonthInterval(expr: Expression, multiplier: Int): Expression = expr match {
     case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
       Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MONTHS)

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
index 5f7204a..13f8a11 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -110,18 +110,11 @@ case class OverCall(
     val aggExprs = agg.asInstanceOf[Aggregation].children.map(_.toRexNode(relBuilder)).asJava
 
     // assemble order by key
-    val orderKey = orderBy match {
-      case _: RowTime =>
-        new RexFieldCollation(relBuilder.call(EventTimeExtractor), Set[SqlKind]().asJava)
-      case _: ProcTime =>
-        new RexFieldCollation(relBuilder.call(ProcTimeExtractor), Set[SqlKind]().asJava)
-      case _ =>
-        throw new ValidationException("Invalid OrderBy expression.")
-    }
+    val orderKey = new RexFieldCollation(orderBy.toRexNode, Set[SqlKind]().asJava)
     val orderKeys = ImmutableList.of(orderKey)
 
     // assemble partition by keys
-    val partitionKeys = partitionBy.map(_.toRexNode(relBuilder)).asJava
+    val partitionKeys = partitionBy.map(_.toRexNode).asJava
 
     // assemble bounds
     val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
@@ -249,6 +242,11 @@ case class OverCall(
         return ValidationFailure("Preceding and following must be of same interval type.")
     }
 
+    // check time field
+    if (!ExpressionUtils.isTimeAttribute(orderBy)) {
+      return ValidationFailure("Ordering must be defined on a time attribute.")
+    }
+
     ValidationSuccess
   }
 }