You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:38 UTC

[04/15] flink git commit: [FLINK-5884] [table] Integrate time indicators for Table API & SQL

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
index 0885929..362d846 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -19,8 +19,9 @@ package org.apache.flink.table.expressions
 
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.api.{UnresolvedException, ValidationException}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 
 trait NamedExpression extends Expression {
@@ -116,24 +117,6 @@ case class UnresolvedAlias(child: Expression) extends UnaryExpression with Named
   override private[flink] lazy val valid = false
 }
 
-case class RowtimeAttribute() extends Attribute {
-  override private[flink] def withName(newName: String): Attribute = {
-    if (newName == "rowtime") {
-      this
-    } else {
-      throw new ValidationException("Cannot rename streaming rowtime attribute.")
-    }
-  }
-
-  override private[flink] def name: String = "rowtime"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    throw new UnsupportedOperationException("A rowtime attribute can not be used solely.")
-  }
-
-  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.LONG_TYPE_INFO
-}
-
 case class WindowReference(name: String) extends Attribute {
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
@@ -150,3 +133,30 @@ case class WindowReference(name: String) extends Attribute {
     }
   }
 }
+
+abstract class TimeAttribute(val expression: Expression)
+  extends UnaryExpression
+  with NamedExpression {
+
+  override private[flink] def child: Expression = expression
+
+  override private[flink] def name: String = expression match {
+    case UnresolvedFieldReference(name) => name
+    case _ => throw new ValidationException("Unresolved field reference expected.")
+  }
+
+  override private[flink] def toAttribute: Attribute =
+    throw new UnsupportedOperationException("Time attribute can not be used solely.")
+}
+
+case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
+
+  override private[flink] def resultType: TypeInformation[_] =
+    TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+}
+
+case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) {
+
+  override private[flink] def resultType: TypeInformation[_] =
+    TimeIndicatorTypeInfo.PROCTIME_INDICATOR
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeMaterializationSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeMaterializationSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeMaterializationSqlFunction.scala
new file mode 100644
index 0000000..d875026
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeMaterializationSqlFunction.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions
+
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.validate.SqlMonotonicity
+
+/**
+  * Function that materializes a time attribute to the metadata timestamp. After materialization
+  * the result can be used in regular arithmetical calculations.
+  */
+object TimeMaterializationSqlFunction
+  extends SqlFunction(
+    "TIME_MATERIALIZATION",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.explicit(SqlTypeName.TIMESTAMP),
+    InferTypes.RETURN_TYPE,
+    OperandTypes.family(SqlTypeFamily.TIMESTAMP),
+    SqlFunctionCategory.SYSTEM) {
+
+  override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
+
+  override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity =
+    SqlMonotonicity.INCREASING
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
deleted file mode 100644
index 3ddcbdc..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.functions
-
-import java.nio.charset.Charset
-import java.util
-
-import org.apache.calcite.rel.`type`._
-import org.apache.calcite.sql._
-import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeFamily, SqlTypeName}
-import org.apache.calcite.sql.validate.SqlMonotonicity
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.expressions.LeafExpression
-
-object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
-  ReturnTypes.explicit(TimeModeTypes.ROWTIME), null, OperandTypes.NILADIC,
-  SqlFunctionCategory.SYSTEM) {
-  override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
-
-  override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity =
-    SqlMonotonicity.INCREASING
-}
-
-object ProcTimeExtractor extends SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION,
-  ReturnTypes.explicit(TimeModeTypes.PROCTIME), null, OperandTypes.NILADIC,
-  SqlFunctionCategory.SYSTEM) {
-  override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
-
-  override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity =
-    SqlMonotonicity.INCREASING
-}
-
-abstract class TimeIndicator extends LeafExpression {
-  /**
-    * Returns the [[org.apache.flink.api.common.typeinfo.TypeInformation]]
-    * for evaluating this expression.
-    * It is sometimes not available until the expression is valid.
-    */
-  override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
-
-  /**
-    * Convert Expression to its counterpart in Calcite, i.e. RexNode
-    */
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) =
-    throw new TableException("indicator functions (e.g. proctime() and rowtime()" +
-      " are not executable. Please check your expressions.")
-}
-
-case class RowTime() extends TimeIndicator
-case class ProcTime() extends TimeIndicator
-
-object TimeModeTypes {
-
-  // indicator data type for row time (event time)
-  val ROWTIME = new RowTimeType
-  // indicator data type for processing time
-  val PROCTIME = new ProcTimeType
-
-}
-
-class RowTimeType extends TimeModeType {
-
-  override def toString(): String = "ROWTIME"
-  override def getFullTypeString: String = "ROWTIME_INDICATOR"
-}
-
-class ProcTimeType extends TimeModeType {
-
-  override def toString(): String = "PROCTIME"
-  override def getFullTypeString: String = "PROCTIME_INDICATOR"
-}
-
-abstract class TimeModeType extends RelDataType {
-
-  override def getComparability: RelDataTypeComparability = RelDataTypeComparability.NONE
-
-  override def isStruct: Boolean = false
-
-  override def getFieldList: util.List[RelDataTypeField] = null
-
-  override def getFieldNames: util.List[String] = null
-
-  override def getFieldCount: Int = 0
-
-  override def getStructKind: StructKind = StructKind.NONE
-
-  override def getField(
-     fieldName: String,
-     caseSensitive: Boolean,
-     elideRecord: Boolean): RelDataTypeField = null
-
-  override def isNullable: Boolean = false
-
-  override def getComponentType: RelDataType = null
-
-  override def getKeyType: RelDataType = null
-
-  override def getValueType: RelDataType = null
-
-  override def getCharset: Charset = null
-
-  override def getCollation: SqlCollation = null
-
-  override def getIntervalQualifier: SqlIntervalQualifier = null
-
-  override def getPrecision: Int = -1
-
-  override def getScale: Int = -1
-
-  override def getSqlTypeName: SqlTypeName = SqlTypeName.TIMESTAMP
-
-  override def getSqlIdentifier: SqlIdentifier = null
-
-  override def getFamily: RelDataTypeFamily = SqlTypeFamily.NUMERIC
-
-  override def getPrecedenceList: RelDataTypePrecedenceList = ???
-
-  override def isDynamicStruct: Boolean = false
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
index d26cdcf..98a7e63 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
@@ -19,9 +19,8 @@
 package org.apache.flink.table.plan
 
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.{OverWindow, StreamTableEnvironment, TableEnvironment}
+import org.apache.flink.table.api.{OverWindow, TableEnvironment}
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.{ProcTime, RowTime}
 import org.apache.flink.table.plan.logical.{LogicalNode, Project}
 
 import scala.collection.mutable
@@ -231,28 +230,12 @@ object ProjectionTranslator {
 
       val overWindow = overWindows.find(_.alias.equals(unresolvedCall.alias))
       if (overWindow.isDefined) {
-        if (tEnv.isInstanceOf[StreamTableEnvironment]) {
-          val timeIndicator = overWindow.get.orderBy match {
-            case u: UnresolvedFieldReference if u.name.toLowerCase == "rowtime" =>
-              RowTime()
-            case u: UnresolvedFieldReference if u.name.toLowerCase == "proctime" =>
-              ProcTime()
-            case e: Expression => e
-          }
-          OverCall(
-            unresolvedCall.agg,
-            overWindow.get.partitionBy,
-            timeIndicator,
-            overWindow.get.preceding,
-            overWindow.get.following)
-        } else {
-          OverCall(
-            unresolvedCall.agg,
-            overWindow.get.partitionBy,
-            overWindow.get.orderBy,
-            overWindow.get.preceding,
-            overWindow.get.following)
-        }
+        OverCall(
+          unresolvedCall.agg,
+          overWindow.get.partitionBy,
+          overWindow.get.orderBy,
+          overWindow.get.preceding,
+          overWindow.get.following)
       } else {
         unresolvedCall
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
index 1884e54..92dc501 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
@@ -22,14 +22,24 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.expressions.{Expression, WindowReference}
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 
-abstract class LogicalWindow(val alias: Expression) extends Resolvable[LogicalWindow] {
+/**
+  * Logical super class for all types of windows (group-windows and row-windows).
+  *
+  * @param aliasAttribute window alias
+  * @param timeAttribute time field indicating event-time or processing-time
+  */
+abstract class LogicalWindow(
+    val aliasAttribute: Expression,
+    val timeAttribute: Expression)
+  extends Resolvable[LogicalWindow] {
 
   def resolveExpressions(resolver: (Expression) => Expression): LogicalWindow = this
 
-  def validate(tableEnv: TableEnvironment): ValidationResult = alias match {
+  def validate(tableEnv: TableEnvironment): ValidationResult = aliasAttribute match {
     case WindowReference(_) => ValidationSuccess
     case _ => ValidationFailure("Window reference for window expected.")
   }
 
   override def toString: String = getClass.getSimpleName
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
index 576756d..3e5de28 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
@@ -18,259 +18,165 @@
 
 package org.apache.flink.table.plan.logical
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.table.api.{BatchTableEnvironment, StreamTableEnvironment, TableEnvironment}
+import org.apache.flink.table.expressions.ExpressionUtils.{isRowCountLiteral, isRowtimeAttribute, isTimeAttribute, isTimeIntervalLiteral}
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeCoercion}
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimePoint
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 
-abstract class EventTimeGroupWindow(
-    alias: Expression,
-    time: Expression)
-  extends LogicalWindow(alias) {
-
-  override def validate(tableEnv: TableEnvironment): ValidationResult = {
-    val valid = super.validate(tableEnv)
-    if (valid.isFailure) {
-        return valid
-    }
-
-    tableEnv match {
-      case _: StreamTableEnvironment =>
-        time match {
-          case RowtimeAttribute() =>
-            ValidationSuccess
-          case _ =>
-            ValidationFailure("Event-time window expects a 'rowtime' time field.")
-      }
-      case _: BatchTableEnvironment =>
-        if (!TypeCoercion.canCast(time.resultType, BasicTypeInfo.LONG_TYPE_INFO)) {
-          ValidationFailure(s"Event-time window expects a time field that can be safely cast " +
-            s"to Long, but is ${time.resultType}")
-        } else {
-          ValidationSuccess
-        }
-    }
-
-  }
-}
-
-abstract class ProcessingTimeGroupWindow(alias: Expression) extends LogicalWindow(alias) {
-  override def validate(tableEnv: TableEnvironment): ValidationResult = {
-    val valid = super.validate(tableEnv)
-    if (valid.isFailure) {
-      return valid
-    }
-
-    tableEnv match {
-      case b: BatchTableEnvironment => ValidationFailure(
-        "Window on batch must declare a time attribute over which the query is evaluated.")
-      case _ =>
-        ValidationSuccess
-    }
-  }
-}
-
 // ------------------------------------------------------------------------------------------------
 // Tumbling group windows
 // ------------------------------------------------------------------------------------------------
 
-object TumblingGroupWindow {
-  def validate(tableEnv: TableEnvironment, size: Expression): ValidationResult = size match {
-    case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
-      ValidationSuccess
-    case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
-      ValidationSuccess
-    case _ =>
-      ValidationFailure("Tumbling window expects size literal of type Interval of Milliseconds " +
-        "or Interval of Rows.")
-  }
-}
-
-case class ProcessingTimeTumblingGroupWindow(
-    override val alias: Expression,
-    size: Expression)
-  extends ProcessingTimeGroupWindow(alias) {
-
-  override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
-    ProcessingTimeTumblingGroupWindow(
-      resolve(alias),
-      resolve(size))
-
-  override def validate(tableEnv: TableEnvironment): ValidationResult =
-    super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, size))
-
-  override def toString: String = s"ProcessingTimeTumblingGroupWindow($alias, $size)"
-}
-
-case class EventTimeTumblingGroupWindow(
-    override val alias: Expression,
+case class TumblingGroupWindow(
+    alias: Expression,
     timeField: Expression,
     size: Expression)
-  extends EventTimeGroupWindow(
+  extends LogicalWindow(
     alias,
     timeField) {
 
   override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
-    EventTimeTumblingGroupWindow(
+    TumblingGroupWindow(
       resolve(alias),
       resolve(timeField),
       resolve(size))
 
   override def validate(tableEnv: TableEnvironment): ValidationResult =
-    super.validate(tableEnv)
-      .orElse(TumblingGroupWindow.validate(tableEnv, size))
-      .orElse(size match {
-        case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS)
-          if tableEnv.isInstanceOf[StreamTableEnvironment] =>
+    super.validate(tableEnv).orElse(
+      tableEnv match {
+
+        // check size
+        case _ if !isTimeIntervalLiteral(size) && !isRowCountLiteral(size) =>
+          ValidationFailure(
+            "Tumbling window expects size literal of type Interval of Milliseconds " +
+              "or Interval of Rows.")
+
+        // check time attribute
+        case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
+          ValidationFailure(
+            "Tumbling window expects a time attribute for grouping in a stream environment.")
+        case _: BatchTableEnvironment if isTimePoint(size.resultType) =>
+          ValidationFailure(
+            "Tumbling window expects a time attribute for grouping in a stream environment.")
+
+        // check row intervals on event-time
+        case _: StreamTableEnvironment
+            if isRowCountLiteral(size) && isRowtimeAttribute(timeField) =>
           ValidationFailure(
             "Event-time grouping windows on row intervals in a stream environment " +
               "are currently not supported.")
+
         case _ =>
           ValidationSuccess
-      })
+      }
+    )
 
-  override def toString: String = s"EventTimeTumblingGroupWindow($alias, $timeField, $size)"
+  override def toString: String = s"TumblingGroupWindow($alias, $timeField, $size)"
 }
 
 // ------------------------------------------------------------------------------------------------
 // Sliding group windows
 // ------------------------------------------------------------------------------------------------
 
-object SlidingGroupWindow {
-  def validate(
-      tableEnv: TableEnvironment,
-      size: Expression,
-      slide: Expression)
-    : ValidationResult = {
-
-    val checkedSize = size match {
-      case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
-        ValidationSuccess
-      case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
-        ValidationSuccess
-      case _ =>
-        ValidationFailure("Sliding window expects size literal of type Interval of " +
-          "Milliseconds or Interval of Rows.")
-    }
-
-    val checkedSlide = slide match {
-      case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
-        ValidationSuccess
-      case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
-        ValidationSuccess
-      case _ =>
-        ValidationFailure("Sliding window expects slide literal of type Interval of " +
-          "Milliseconds or Interval of Rows.")
-    }
-
-    checkedSize
-      .orElse(checkedSlide)
-      .orElse {
-        if (size.resultType != slide.resultType) {
-          ValidationFailure("Sliding window expects same type of size and slide.")
-        } else {
-          ValidationSuccess
-        }
-      }
-  }
-}
-
-case class ProcessingTimeSlidingGroupWindow(
-    override val alias: Expression,
+case class SlidingGroupWindow(
+    alias: Expression,
+    timeField: Expression,
     size: Expression,
     slide: Expression)
-  extends ProcessingTimeGroupWindow(alias) {
+  extends LogicalWindow(
+    alias,
+    timeField) {
 
   override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
-    ProcessingTimeSlidingGroupWindow(
+    SlidingGroupWindow(
       resolve(alias),
+      resolve(timeField),
       resolve(size),
       resolve(slide))
 
   override def validate(tableEnv: TableEnvironment): ValidationResult =
-    super.validate(tableEnv).orElse(SlidingGroupWindow.validate(tableEnv, size, slide))
+    super.validate(tableEnv).orElse(
+      tableEnv match {
 
-  override def toString: String = s"ProcessingTimeSlidingGroupWindow($alias, $size, $slide)"
-}
+        // check size
+        case _ if !isTimeIntervalLiteral(size) && !isRowCountLiteral(size) =>
+          ValidationFailure(
+            "Sliding window expects size literal of type Interval of Milliseconds " +
+              "or Interval of Rows.")
 
-case class EventTimeSlidingGroupWindow(
-    override val alias: Expression,
-    timeField: Expression,
-    size: Expression,
-    slide: Expression)
-  extends EventTimeGroupWindow(alias, timeField) {
+        // check slide
+        case _ if !isTimeIntervalLiteral(slide) && !isRowCountLiteral(slide) =>
+          ValidationFailure(
+            "Sliding window expects slide literal of type Interval of Milliseconds " +
+              "or Interval of Rows.")
 
-  override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
-    EventTimeSlidingGroupWindow(
-      resolve(alias),
-      resolve(timeField),
-      resolve(size),
-      resolve(slide))
+        // check same type of intervals
+        case _ if isTimeIntervalLiteral(size) != isTimeIntervalLiteral(slide) =>
+          ValidationFailure("Sliding window expects same type of size and slide.")
 
-  override def validate(tableEnv: TableEnvironment): ValidationResult =
-    super.validate(tableEnv)
-      .orElse(SlidingGroupWindow.validate(tableEnv, size, slide))
-      .orElse(size match {
-        case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS)
-          if tableEnv.isInstanceOf[StreamTableEnvironment] =>
+        // check time attribute
+        case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
+          ValidationFailure(
+            "Sliding window expects a time attribute for grouping in a stream environment.")
+        case _: BatchTableEnvironment if isTimePoint(size.resultType) =>
+          ValidationFailure(
+            "Sliding window expects a time attribute for grouping in a stream environment.")
+
+        // check row intervals on event-time
+        case _: StreamTableEnvironment
+            if isRowCountLiteral(size) && isRowtimeAttribute(timeField) =>
           ValidationFailure(
             "Event-time grouping windows on row intervals in a stream environment " +
               "are currently not supported.")
+
         case _ =>
           ValidationSuccess
-      })
+      }
+    )
 
-  override def toString: String = s"EventTimeSlidingGroupWindow($alias, $timeField, $size, $slide)"
+  override def toString: String = s"SlidingGroupWindow($alias, $timeField, $size, $slide)"
 }
 
 // ------------------------------------------------------------------------------------------------
 // Session group windows
 // ------------------------------------------------------------------------------------------------
 
-object SessionGroupWindow {
-
-  def validate(tableEnv: TableEnvironment, gap: Expression): ValidationResult = gap match {
-    case Literal(timeInterval: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
-      ValidationSuccess
-    case _ =>
-      ValidationFailure(
-        "Session window expects gap literal of type Interval of Milliseconds.")
-  }
-}
-
-case class ProcessingTimeSessionGroupWindow(
-    override val alias: Expression,
-    gap: Expression)
-  extends ProcessingTimeGroupWindow(alias) {
-
-  override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
-    ProcessingTimeSessionGroupWindow(
-      resolve(alias),
-      resolve(gap))
-
-  override def validate(tableEnv: TableEnvironment): ValidationResult =
-    super.validate(tableEnv).orElse(SessionGroupWindow.validate(tableEnv, gap))
-
-  override def toString: String = s"ProcessingTimeSessionGroupWindow($alias, $gap)"
-}
-
-case class EventTimeSessionGroupWindow(
-    override val alias: Expression,
+case class SessionGroupWindow(
+    alias: Expression,
     timeField: Expression,
     gap: Expression)
-  extends EventTimeGroupWindow(
+  extends LogicalWindow(
     alias,
     timeField) {
 
   override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
-    EventTimeSessionGroupWindow(
+    SessionGroupWindow(
       resolve(alias),
       resolve(timeField),
       resolve(gap))
 
   override def validate(tableEnv: TableEnvironment): ValidationResult =
-    super.validate(tableEnv).orElse(SessionGroupWindow.validate(tableEnv, gap))
+    super.validate(tableEnv).orElse(
+      tableEnv match {
+
+        // check size
+        case _ if !isTimeIntervalLiteral(gap) =>
+          ValidationFailure(
+            "Session window expects size literal of type Interval of Milliseconds.")
+
+        // check time attribute
+        case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
+          ValidationFailure(
+            "Session window expects a time attribute for grouping in a stream environment.")
+        case _: BatchTableEnvironment if isTimePoint(gap.resultType) =>
+          ValidationFailure(
+            "Session window expects a time attribute for grouping in a stream environment.")
+
+        case _ =>
+          ValidationSuccess
+      }
+    )
 
-  override def toString: String = s"EventTimeSessionGroupWindow($alias, $timeField, $gap)"
+  override def toString: String = s"SessionGroupWindow($alias, $timeField, $gap)"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 5f2394c..3839145 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -70,8 +70,6 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
     def checkName(name: String): Unit = {
       if (names.contains(name)) {
         failValidation(s"Duplicate field name $name.")
-      } else if (tableEnv.isInstanceOf[StreamTableEnvironment] && name == "rowtime") {
-        failValidation("'rowtime' cannot be used as field name in a streaming environment.")
       } else {
         names.add(name)
       }
@@ -112,10 +110,6 @@ case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends Una
       failValidation("Alias only accept name expressions as arguments")
     } else if (!aliasList.forall(_.asInstanceOf[UnresolvedFieldReference].name != "*")) {
       failValidation("Alias can not accept '*' as name")
-    } else if (tableEnv.isInstanceOf[StreamTableEnvironment] && !aliasList.forall {
-          case UnresolvedFieldReference(name) => name != "rowtime"
-        }) {
-      failValidation("'rowtime' cannot be used as field name in a streaming environment.")
     } else {
       val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
       val input = child.output
@@ -561,26 +555,20 @@ case class WindowAggregate(
   override def resolveReference(
       tableEnv: TableEnvironment,
       name: String)
-    : Option[NamedExpression] = tableEnv match {
-    // resolve reference to rowtime attribute in a streaming environment
-    case _: StreamTableEnvironment if name == "rowtime" =>
-      Some(RowtimeAttribute())
-    case _ =>
-      window.alias match {
-        // resolve reference to this window's alias
-        case UnresolvedFieldReference(alias) if name == alias =>
-          // check if reference can already be resolved by input fields
-          val found = super.resolveReference(tableEnv, name)
-          if (found.isDefined) {
-            failValidation(s"Reference $name is ambiguous.")
-          } else {
-            Some(WindowReference(name))
-          }
-        case _ =>
-          // resolve references as usual
-          super.resolveReference(tableEnv, name)
-      }
-  }
+    : Option[NamedExpression] = window.aliasAttribute match {
+      // resolve reference to this window's name
+      case UnresolvedFieldReference(alias) if name == alias =>
+        // check if reference can already be resolved by input fields
+        val found = super.resolveReference(tableEnv, name)
+        if (found.isDefined) {
+          failValidation(s"Reference $name is ambiguous.")
+        } else {
+          Some(WindowReference(name))
+        }
+      case _ =>
+        // resolve references as usual
+        super.resolveReference(tableEnv, name)
+    }
 
   override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
     val flinkRelBuilder = relBuilder.asInstanceOf[FlinkRelBuilder]

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 96a7470..5c35129 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -19,13 +19,12 @@
 package org.apache.flink.table.plan.nodes
 
 import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
 import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.FlatMapRunner
 import org.apache.flink.types.Row
 
@@ -35,21 +34,30 @@ import scala.collection.JavaConverters._
 trait CommonCalc {
 
   private[flink] def functionBody(
-      generator: CodeGenerator,
-      inputType: TypeInformation[Row],
-      rowType: RelDataType,
-      calcProgram: RexProgram,
-      config: TableConfig)
+     generator: CodeGenerator,
+     inputSchema: RowSchema,
+     returnSchema: RowSchema,
+     calcProgram: RexProgram,
+     config: TableConfig)
     : String = {
 
-    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
+    val expandedExpressions = calcProgram
+      .getProjectList
+      .map(expr => calcProgram.expandLocalRef(expr))
+      // time indicator fields must not be part of the code generation
+      .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType))
+      // update indices
+      .map(expr => inputSchema.mapRexNode(expr))
+
+    val condition = if (calcProgram.getCondition != null) {
+      inputSchema.mapRexNode(calcProgram.expandLocalRef(calcProgram.getCondition))
+    } else {
+      null
+    }
 
-    val condition = calcProgram.getCondition
-    val expandedExpressions = calcProgram.getProjectList.map(
-      expr => calcProgram.expandLocalRef(expr))
     val projection = generator.generateResultExpression(
-      returnType,
-      rowType.getFieldNames,
+      returnSchema.physicalTypeInfo,
+      returnSchema.physicalFieldNames,
       expandedExpressions)
 
     // only projection
@@ -60,8 +68,7 @@ trait CommonCalc {
         |""".stripMargin
     }
     else {
-      val filterCondition = generator.generateExpression(
-        calcProgram.expandLocalRef(calcProgram.getCondition))
+      val filterCondition = generator.generateExpression(condition)
       // only filter
       if (projection == null) {
         s"""

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 6c4066b..02305ee 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -23,11 +23,11 @@ import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
 import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction}
 import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector}
 import org.apache.flink.types.Row
 
@@ -44,9 +44,9 @@ trait CommonCorrelate {
     */
   private[flink] def correlateMapFunction(
       config: TableConfig,
-      inputTypeInfo: TypeInformation[Row],
+      inputSchema: RowSchema,
       udtfTypeInfo: TypeInformation[Any],
-      rowType: RelDataType,
+      returnSchema: RowSchema,
       joinType: SemiJoinType,
       rexCall: RexCall,
       condition: Option[RexNode],
@@ -54,26 +54,24 @@ trait CommonCorrelate {
       ruleDescription: String)
     : CorrelateFlatMapRunner[Row, Row] = {
 
-    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
-
     val flatMap = generateFunction(
       config,
-      inputTypeInfo,
+      inputSchema.physicalTypeInfo,
       udtfTypeInfo,
-      returnType,
-      rowType,
+      returnSchema.physicalTypeInfo,
+      returnSchema.logicalFieldNames,
       joinType,
-      rexCall,
+      inputSchema.mapRexNode(rexCall).asInstanceOf[RexCall],
       pojoFieldMapping,
       ruleDescription)
 
     val collector = generateCollector(
       config,
-      inputTypeInfo,
+      inputSchema.physicalTypeInfo,
       udtfTypeInfo,
-      returnType,
-      rowType,
-      condition,
+      returnSchema.physicalTypeInfo,
+      returnSchema.logicalFieldNames,
+      condition.map(inputSchema.mapRexNode),
       pojoFieldMapping)
 
     new CorrelateFlatMapRunner[Row, Row](
@@ -93,7 +91,7 @@ trait CommonCorrelate {
       inputTypeInfo: TypeInformation[Row],
       udtfTypeInfo: TypeInformation[Any],
       returnType: TypeInformation[Row],
-      rowType: RelDataType,
+      resultFieldNames: Seq[String],
       joinType: SemiJoinType,
       rexCall: RexCall,
       pojoFieldMapping: Option[Array[Int]],
@@ -134,7 +132,7 @@ trait CommonCorrelate {
           x.resultType)
       }
       val outerResultExpr = functionGenerator.generateResultExpression(
-        input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala)
+        input1AccessExprs ++ input2NullExprs, returnType, resultFieldNames)
       body +=
         s"""
           |boolean hasOutput = $collectorTerm.isCollected();
@@ -162,7 +160,7 @@ trait CommonCorrelate {
       inputTypeInfo: TypeInformation[Row],
       udtfTypeInfo: TypeInformation[Any],
       returnType: TypeInformation[Row],
-      rowType: RelDataType,
+      resultFieldNames: Seq[String],
       condition: Option[RexNode],
       pojoFieldMapping: Option[Array[Int]])
     : GeneratedCollector = {
@@ -180,7 +178,7 @@ trait CommonCorrelate {
     val crossResultExpr = generator.generateResultExpression(
       input1AccessExprs ++ input2AccessExprs,
       returnType,
-      rowType.getFieldNames.asScala)
+      resultFieldNames)
 
     val collectorCode = if (condition.isEmpty) {
       s"""

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
index 0a0d204..091a1ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -18,11 +18,10 @@
 
 package org.apache.flink.table.plan.nodes
 
-import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.functions.Function
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
 import org.apache.flink.types.Row
 
 /**
@@ -42,21 +41,22 @@ trait CommonScan {
     externalTypeInfo != internalTypeInfo
   }
 
-  private[flink] def getConversionMapper(
+  private[flink] def generatedConversionFunction[F <: Function](
       config: TableConfig,
+      functionClass: Class[F],
       inputType: TypeInformation[Any],
       expectedType: TypeInformation[Row],
       conversionOperatorName: String,
       fieldNames: Seq[String],
-      inputPojoFieldMapping: Option[Array[Int]] = None)
-    : MapFunction[Any, Row] = {
+      inputFieldMapping: Option[Array[Int]] = None)
+    : GeneratedFunction[F, Row] = {
 
     val generator = new CodeGenerator(
       config,
       false,
       inputType,
       None,
-      inputPojoFieldMapping)
+      inputFieldMapping)
     val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
 
     val body =
@@ -65,17 +65,11 @@ trait CommonScan {
          |return ${conversion.resultTerm};
          |""".stripMargin
 
-    val genFunction = generator.generateFunction(
+    generator.generateFunction(
       conversionOperatorName,
-      classOf[MapFunction[Any, Row]],
+      functionClass,
       body,
       expectedType)
-
-    new MapRunner[Any, Row](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
index 6878473..1048549 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
@@ -18,14 +18,13 @@
 
 package org.apache.flink.table.plan.nodes
 
-import org.apache.calcite.rel.{RelFieldCollation, RelNode}
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
-import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{AggregateCall, Window}
 import org.apache.calcite.rel.core.Window.Group
-import org.apache.calcite.rel.core.Window
-import org.apache.calcite.rex.{RexInputRef}
+import org.apache.calcite.rel.{RelFieldCollation, RelNode}
+import org.apache.calcite.rex.RexInputRef
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.aggregate.AggregateUtil._
-import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
 
 import scala.collection.JavaConverters._
 
@@ -43,7 +42,7 @@ trait OverAggregate {
     val inFields = inputType.getFieldList.asScala
 
     val orderingString = orderFields.asScala.map {
-      x => inFields(x.getFieldIndex).getValue
+      x => inFields(x.getFieldIndex).getName
     }.mkString(", ")
 
     orderingString
@@ -66,24 +65,8 @@ trait OverAggregate {
     rowType: RelDataType,
     namedAggregates: Seq[CalcitePair[AggregateCall, String]]): String = {
 
-    val inFields = inputType.getFieldList.asScala.map {
-      x =>
-        x.asInstanceOf[RelDataTypeFieldImpl].getType
-        match {
-          case proceTime: ProcTimeType => "PROCTIME"
-          case rowTime: RowTimeType => "ROWTIME"
-          case _ => x.asInstanceOf[RelDataTypeFieldImpl].getName
-        }
-    }
-    val outFields = rowType.getFieldList.asScala.map {
-      x =>
-        x.asInstanceOf[RelDataTypeFieldImpl].getType
-        match {
-          case proceTime: ProcTimeType => "PROCTIME"
-          case rowTime: RowTimeType => "ROWTIME"
-          case _ => x.asInstanceOf[RelDataTypeFieldImpl].getName
-        }
-    }
+    val inFields = inputType.getFieldNames.asScala
+    val outFields = rowType.getFieldNames.asScala
 
     val aggStrings = namedAggregates.map(_.getKey).map(
       a => s"${a.getAggregation}(${
@@ -109,7 +92,7 @@ trait OverAggregate {
     input: RelNode): Long = {
 
     val ref: RexInputRef = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
-    val lowerBoundIndex = input.getRowType.getFieldCount - ref.getIndex;
+    val lowerBoundIndex = input.getRowType.getFieldCount - ref.getIndex
     val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
     lowerBound match {
       case x: java.math.BigDecimal => x.asInstanceOf[java.math.BigDecimal].longValue()

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
index d924450..c18c3d1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
@@ -37,9 +37,11 @@ abstract class PhysicalTableSourceScan(
 
   override def deriveRowType(): RelDataType = {
     val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildRowDataType(
+    flinkTypeFactory.buildLogicalRowType(
       TableEnvironment.getFieldNames(tableSource),
-      TableEnvironment.getFieldTypes(tableSource.getReturnType))
+      TableEnvironment.getFieldTypes(tableSource.getReturnType),
+      None,
+      None)
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
index b39b8ed..cc5d9fb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -18,11 +18,13 @@
 
 package org.apache.flink.table.plan.nodes.dataset
 
+import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
@@ -43,17 +45,23 @@ trait BatchScan extends CommonScan with DataSetRel {
     // conversion
     if (needsConversion(inputType, internalType)) {
 
-      val mapFunc = getConversionMapper(
+      val function = generatedConversionFunction(
         config,
+        classOf[MapFunction[Any, Row]],
         inputType,
         internalType,
         "DataSetSourceConversion",
         getRowType.getFieldNames,
         Some(flinkTable.fieldIndexes))
 
+      val runner = new MapRunner[Any, Row](
+        function.name,
+        function.code,
+        function.returnType)
+
       val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
-      input.map(mapFunc).name(opName)
+      input.map(runner).name(opName)
     }
     // no conversion necessary, forward
     else {

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index bf4291a..fb291e4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -22,7 +22,8 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment}
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.sources.{BatchTableSource, TableSource}
@@ -37,7 +38,16 @@ class BatchTableSourceScan(
   extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource)
   with BatchScan {
 
-  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+  override def deriveRowType() = {
+    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    flinkTypeFactory.buildLogicalRowType(
+      TableEnvironment.getFieldNames(tableSource),
+      TableEnvironment.getFieldTypes(tableSource.getReturnType),
+      None,
+      None)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
     val rowCnt = metadata.getRowCount(this)
     planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
index b92775c..c22dc54 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -91,6 +91,9 @@ class DataSetAggregate(
   override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val input = inputNode.asInstanceOf[DataSetRel]
+
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
     val generator = new CodeGenerator(
       tableEnv.getConfig,
@@ -104,15 +107,14 @@ class DataSetAggregate(
       ) = AggregateUtil.createDataSetAggregateFunctions(
         generator,
         namedAggregates,
-        inputType,
+        input.getRowType,
+        inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
         rowRelDataType,
         grouping,
         inGroupingSet)
 
     val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)
 
-    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
-
     if (grouping.length > 0) {
       // grouped aggregation
       val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index e05b5a8..9e18082 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -26,10 +26,13 @@ import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex._
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.FlatMapRunner
 import org.apache.flink.types.Row
 
 /**
@@ -83,14 +86,14 @@ class DataSetCalc(
 
     val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
-
     val generator = new CodeGenerator(config, false, inputDS.getType)
 
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
     val body = functionBody(
       generator,
-      inputDS.getType,
-      getRowType,
+      new RowSchema(getInput.getRowType),
+      new RowSchema(getRowType),
       calcProgram,
       config)
 
@@ -98,9 +101,13 @@ class DataSetCalc(
       ruleDescription,
       classOf[FlatMapFunction[Row, Row]],
       body,
-      returnType)
+      rowTypeInfo)
+
+    val runner = new FlatMapRunner[Row, Row](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
 
-    val mapFunc = calcMapFunction(genFunction)
-    inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
+    inputDS.flatMap(runner).name(calcOpName(calcProgram, getExpressionString))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
index 2a62e21..6c79b45 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -25,10 +25,13 @@ import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.CommonCorrelate
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
 
 /**
@@ -98,11 +101,13 @@ class DataSetCorrelate(
     val pojoFieldMapping = sqlFunction.getPojoFieldMapping
     val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
 
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
     val mapFunc = correlateMapFunction(
       config,
-      inputDS.getType,
+      new RowSchema(getInput.getRowType),
       udtfTypeInfo,
-      getRowType,
+      new RowSchema(getRowType),
       joinType,
       rexCall,
       condition,

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
index 96c427e..3cb872a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
@@ -24,15 +24,16 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.expressions.ExpressionUtils._
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
-import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isLong, isTimePoint}
 import org.apache.flink.types.Row
 
 /**
@@ -106,8 +107,6 @@ class DataSetWindowAggregate(
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
-    val config = tableEnv.getConfig
-
     val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
     val generator = new CodeGenerator(
@@ -119,30 +118,31 @@ class DataSetWindowAggregate(
     val caseSensitive = tableEnv.getFrameworkConfig.getParserConfig.caseSensitive()
 
     window match {
-      case EventTimeTumblingGroupWindow(_, _, size) =>
+      case TumblingGroupWindow(_, timeField, size)
+          if isTimePoint(timeField.resultType) || isLong(timeField.resultType) =>
         createEventTimeTumblingWindowDataSet(
           generator,
           inputDS,
-          isTimeInterval(size.resultType),
+          isTimeIntervalLiteral(size),
           caseSensitive)
 
-      case EventTimeSessionGroupWindow(_, _, gap) =>
+      case SessionGroupWindow(_, timeField, gap)
+          if isTimePoint(timeField.resultType) || isLong(timeField.resultType) =>
         createEventTimeSessionWindowDataSet(generator, inputDS, caseSensitive)
 
-      case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+      case SlidingGroupWindow(_, timeField, size, slide)
+          if isTimePoint(timeField.resultType) || isLong(timeField.resultType) =>
         createEventTimeSlidingWindowDataSet(
           generator,
           inputDS,
-          isTimeInterval(size.resultType),
+          isTimeIntervalLiteral(size),
           asLong(size),
           asLong(slide),
           caseSensitive)
 
-      case _: ProcessingTimeGroupWindow =>
+      case _ =>
         throw new UnsupportedOperationException(
-          "Processing-time tumbling windows are not supported in a batch environment, " +
-            "windows in a batch environment must declare a time attribute over which " +
-            "the query is evaluated.")
+          s"Window $window is not supported in a batch environment.")
     }
   }
 
@@ -152,18 +152,22 @@ class DataSetWindowAggregate(
       isTimeWindow: Boolean,
       isParserCaseSensitive: Boolean): DataSet[Row] = {
 
+    val input = inputNode.asInstanceOf[DataSetRel]
+
     val mapFunction = createDataSetWindowPrepareMapFunction(
       generator,
       window,
       namedAggregates,
       grouping,
-      inputType,
+      input.getRowType,
+      inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
       isParserCaseSensitive)
     val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction(
       generator,
       window,
       namedAggregates,
-      inputType,
+      input.getRowType,
+      inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
       getRowType,
       grouping,
       namedProperties)
@@ -210,6 +214,8 @@ class DataSetWindowAggregate(
       inputDS: DataSet[Row],
       isParserCaseSensitive: Boolean): DataSet[Row] = {
 
+    val input = inputNode.asInstanceOf[DataSetRel]
+
     val groupingKeys = grouping.indices.toArray
     val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
@@ -219,7 +225,8 @@ class DataSetWindowAggregate(
       window,
       namedAggregates,
       grouping,
-      inputType,
+      input.getRowType,
+      inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
       isParserCaseSensitive)
 
     val mappedInput = inputDS.map(mapFunction).name(prepareOperatorName)
@@ -245,7 +252,8 @@ class DataSetWindowAggregate(
           generator,
           window,
           namedAggregates,
-          inputType,
+          input.getRowType,
+          inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
           grouping)
 
         // create groupReduceFunction for calculating the aggregations
@@ -253,7 +261,8 @@ class DataSetWindowAggregate(
           generator,
           window,
           namedAggregates,
-          inputType,
+          input.getRowType,
+          inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
           rowRelDataType,
           grouping,
           namedProperties,
@@ -275,7 +284,8 @@ class DataSetWindowAggregate(
           generator,
           window,
           namedAggregates,
-          inputType,
+          input.getRowType,
+          inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
           grouping)
 
         // create groupReduceFunction for calculating the aggregations
@@ -283,7 +293,8 @@ class DataSetWindowAggregate(
           generator,
           window,
           namedAggregates,
-          inputType,
+          input.getRowType,
+          inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
           rowRelDataType,
           grouping,
           namedProperties,
@@ -308,7 +319,8 @@ class DataSetWindowAggregate(
           generator,
           window,
           namedAggregates,
-          inputType,
+          input.getRowType,
+          inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
           rowRelDataType,
           grouping,
           namedProperties)
@@ -324,7 +336,8 @@ class DataSetWindowAggregate(
           generator,
           window,
           namedAggregates,
-          inputType,
+          input.getRowType,
+          inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
           rowRelDataType,
           grouping,
           namedProperties)
@@ -347,6 +360,8 @@ class DataSetWindowAggregate(
       isParserCaseSensitive: Boolean)
     : DataSet[Row] = {
 
+    val input = inputNode.asInstanceOf[DataSetRel]
+
     // create MapFunction for initializing the aggregations
     // it aligns the rowtime for pre-tumbling in case of a time-window for partial aggregates
     val mapFunction = createDataSetWindowPrepareMapFunction(
@@ -354,7 +369,8 @@ class DataSetWindowAggregate(
       window,
       namedAggregates,
       grouping,
-      inputType,
+      input.getRowType,
+      inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
       isParserCaseSensitive)
 
     val mappedDataSet = inputDS
@@ -390,7 +406,8 @@ class DataSetWindowAggregate(
           window,
           namedAggregates,
           grouping,
-          inputType,
+          input.getRowType,
+          inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
           isParserCaseSensitive)
 
         mappedDataSet.asInstanceOf[DataSet[Row]]
@@ -426,7 +443,8 @@ class DataSetWindowAggregate(
       generator,
       window,
       namedAggregates,
-      inputType,
+      input.getRowType,
+      inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
       rowRelDataType,
       grouping,
       namedProperties,

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
index c232a71..5697449 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -25,20 +25,18 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
 import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.expressions._
+import org.apache.flink.table.expressions.ExpressionUtils._
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
 import org.apache.flink.types.Row
 
 class DataStreamAggregate(
@@ -48,12 +46,12 @@ class DataStreamAggregate(
     traitSet: RelTraitSet,
     inputNode: RelNode,
     namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    rowRelDataType: RelDataType,
-    inputType: RelDataType,
+    schema: RowSchema,
+    inputSchema: RowSchema,
     grouping: Array[Int])
   extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel {
 
-  override def deriveRowType(): RelDataType = rowRelDataType
+  override def deriveRowType(): RelDataType = schema.logicalType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamAggregate(
@@ -63,22 +61,22 @@ class DataStreamAggregate(
       traitSet,
       inputs.get(0),
       namedAggregates,
-      getRowType,
-      inputType,
+      schema,
+      inputSchema,
       grouping)
   }
 
   override def toString: String = {
     s"Aggregate(${
       if (!grouping.isEmpty) {
-        s"groupBy: (${groupingToString(inputType, grouping)}), "
+        s"groupBy: (${groupingToString(inputSchema.logicalType, grouping)}), "
       } else {
         ""
       }
     }window: ($window), " +
       s"select: (${
         aggregationToString(
-          inputType,
+          inputSchema.logicalType,
           grouping,
           getRowType,
           namedAggregates,
@@ -88,13 +86,13 @@ class DataStreamAggregate(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
-      .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
+      .itemIf("groupBy", groupingToString(inputSchema.logicalType, grouping), !grouping.isEmpty)
       .item("window", window)
       .item(
         "select", aggregationToString(
-          inputType,
+          inputSchema.logicalType,
           grouping,
-          getRowType,
+          schema.logicalType,
           namedAggregates,
           namedProperties))
   }
@@ -102,17 +100,20 @@ class DataStreamAggregate(
   override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-
-    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+    val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
+      new CalcitePair[AggregateCall, String](
+        inputSchema.mapAggregateCall(namedAggregate.left),
+        namedAggregate.right)
+    }
 
     val aggString = aggregationToString(
-      inputType,
+      inputSchema.logicalType,
       grouping,
-      getRowType,
+      schema.logicalType,
       namedAggregates,
       namedProperties)
 
-    val keyedAggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+    val keyedAggOpName = s"groupBy: (${groupingToString(schema.logicalType, grouping)}), " +
       s"window: ($window), " +
       s"select: ($aggString)"
     val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
@@ -123,21 +124,21 @@ class DataStreamAggregate(
       inputDS.getType)
 
     val needMerge = window match {
-      case ProcessingTimeSessionGroupWindow(_, _) => true
-      case EventTimeSessionGroupWindow(_, _, _) => true
+      case SessionGroupWindow(_, _, _) => true
       case _ => false
     }
+    val physicalGrouping = grouping.map(inputSchema.mapIndex)
 
     // grouped / keyed aggregation
-    if (grouping.length > 0) {
+    if (physicalGrouping.length > 0) {
       val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(
         window,
-        grouping.length,
-        namedAggregates.size,
-        rowRelDataType.getFieldCount,
+        physicalGrouping.length,
+        physicalNamedAggregates.size,
+        schema.physicalArity,
         namedProperties)
 
-      val keyedStream = inputDS.keyBy(grouping: _*)
+      val keyedStream = inputDS.keyBy(physicalGrouping: _*)
       val windowedStream =
         createKeyedWindowedStream(window, keyedStream)
           .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
@@ -145,20 +146,26 @@ class DataStreamAggregate(
       val (aggFunction, accumulatorRowType, aggResultRowType) =
         AggregateUtil.createDataStreamAggregateFunction(
           generator,
-          namedAggregates,
-          inputType,
-          rowRelDataType,
+          physicalNamedAggregates,
+          inputSchema.physicalType,
+          inputSchema.physicalFieldTypeInfo,
+          schema.physicalType,
           needMerge)
 
       windowedStream
-        .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
+        .aggregate(
+          aggFunction,
+          windowFunction,
+          accumulatorRowType,
+          aggResultRowType,
+          schema.physicalTypeInfo)
         .name(keyedAggOpName)
     }
     // global / non-keyed aggregation
     else {
       val windowFunction = AggregateUtil.createAggregationAllWindowFunction(
         window,
-        rowRelDataType.getFieldCount,
+        schema.physicalArity,
         namedProperties)
 
       val windowedStream =
@@ -168,13 +175,19 @@ class DataStreamAggregate(
       val (aggFunction, accumulatorRowType, aggResultRowType) =
         AggregateUtil.createDataStreamAggregateFunction(
           generator,
-          namedAggregates,
-          inputType,
-          rowRelDataType,
+          physicalNamedAggregates,
+          inputSchema.physicalType,
+          inputSchema.physicalFieldTypeInfo,
+          schema.physicalType,
           needMerge)
 
       windowedStream
-        .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
+        .aggregate(
+          aggFunction,
+          windowFunction,
+          accumulatorRowType,
+          aggResultRowType,
+          schema.physicalTypeInfo)
         .name(nonKeyedAggOpName)
     }
   }
@@ -186,95 +199,102 @@ object DataStreamAggregate {
   private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
     : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
 
-    case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
-      stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
+    case TumblingGroupWindow(_, timeField, size)
+        if isProctimeAttribute(timeField) && isTimeIntervalLiteral(size)=>
+      stream.window(TumblingProcessingTimeWindows.of(toTime(size)))
 
-    case ProcessingTimeTumblingGroupWindow(_, size) =>
-      stream.countWindow(asCount(size))
+    case TumblingGroupWindow(_, timeField, size)
+        if isProctimeAttribute(timeField) && isRowCountLiteral(size)=>
+      stream.countWindow(toLong(size))
 
-    case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
-      stream.window(TumblingEventTimeWindows.of(asTime(size)))
+    case TumblingGroupWindow(_, timeField, size)
+        if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size) =>
+      stream.window(TumblingEventTimeWindows.of(toTime(size)))
 
-    case EventTimeTumblingGroupWindow(_, _, size) =>
+    case TumblingGroupWindow(_, _, size) =>
       // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
       // before applying the  windowing logic. Otherwise, this would be the same as a
       // ProcessingTimeTumblingGroupWindow
       throw new UnsupportedOperationException(
         "Event-time grouping windows on row intervals are currently not supported.")
 
-    case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
-      stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+    case SlidingGroupWindow(_, timeField, size, slide)
+        if isProctimeAttribute(timeField) && isTimeIntervalLiteral(slide) =>
+      stream.window(SlidingProcessingTimeWindows.of(toTime(size), toTime(slide)))
 
-    case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
-      stream.countWindow(asCount(size), asCount(slide))
+    case SlidingGroupWindow(_, timeField, size, slide)
+        if isProctimeAttribute(timeField) && isRowCountLiteral(size) =>
+      stream.countWindow(toLong(size), toLong(slide))
 
-    case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
-      stream.window(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+    case SlidingGroupWindow(_, timeField, size, slide)
+        if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=>
+      stream.window(SlidingEventTimeWindows.of(toTime(size), toTime(slide)))
 
-    case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+    case SlidingGroupWindow(_, _, size, slide) =>
       // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
       // before applying the  windowing logic. Otherwise, this would be the same as a
       // ProcessingTimeTumblingGroupWindow
       throw new UnsupportedOperationException(
         "Event-time grouping windows on row intervals are currently not supported.")
 
-    case ProcessingTimeSessionGroupWindow(_, gap: Expression) =>
-      stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+    case SessionGroupWindow(_, timeField, gap)
+        if isProctimeAttribute(timeField) =>
+      stream.window(ProcessingTimeSessionWindows.withGap(toTime(gap)))
 
-    case EventTimeSessionGroupWindow(_, _, gap) =>
-      stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
+    case SessionGroupWindow(_, timeField, gap)
+        if isRowtimeAttribute(timeField) =>
+      stream.window(EventTimeSessionWindows.withGap(toTime(gap)))
   }
 
   private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
     : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
 
-    case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
-      stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))
+    case TumblingGroupWindow(_, timeField, size)
+        if isProctimeAttribute(timeField) && isTimeIntervalLiteral(size) =>
+      stream.windowAll(TumblingProcessingTimeWindows.of(toTime(size)))
 
-    case ProcessingTimeTumblingGroupWindow(_, size) =>
-      stream.countWindowAll(asCount(size))
+    case TumblingGroupWindow(_, timeField, size)
+        if isProctimeAttribute(timeField) && isRowCountLiteral(size)=>
+      stream.countWindowAll(toLong(size))
 
-    case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
-      stream.windowAll(TumblingEventTimeWindows.of(asTime(size)))
+    case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+      stream.windowAll(TumblingEventTimeWindows.of(toTime(size)))
 
-    case EventTimeTumblingGroupWindow(_, _, size) =>
+    case TumblingGroupWindow(_, _, size) =>
       // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
       // before applying the  windowing logic. Otherwise, this would be the same as a
       // ProcessingTimeTumblingGroupWindow
       throw new UnsupportedOperationException(
         "Event-time grouping windows on row intervals are currently not supported.")
 
-    case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
-      stream.windowAll(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+    case SlidingGroupWindow(_, timeField, size, slide)
+        if isProctimeAttribute(timeField) && isTimeIntervalLiteral(size) =>
+      stream.windowAll(SlidingProcessingTimeWindows.of(toTime(size), toTime(slide)))
 
-    case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
-      stream.countWindowAll(asCount(size), asCount(slide))
+    case SlidingGroupWindow(_, timeField, size, slide)
+        if isProctimeAttribute(timeField) && isRowCountLiteral(size)=>
+      stream.countWindowAll(toLong(size), toLong(slide))
 
-    case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
-      stream.windowAll(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+    case SlidingGroupWindow(_, timeField, size, slide)
+        if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=>
+      stream.windowAll(SlidingEventTimeWindows.of(toTime(size), toTime(slide)))
 
-    case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+    case SlidingGroupWindow(_, _, size, slide) =>
       // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
       // before applying the  windowing logic. Otherwise, this would be the same as a
       // ProcessingTimeTumblingGroupWindow
       throw new UnsupportedOperationException(
         "Event-time grouping windows on row intervals are currently not supported.")
 
-    case ProcessingTimeSessionGroupWindow(_, gap) =>
-      stream.windowAll(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+    case SessionGroupWindow(_, timeField, gap)
+        if isProctimeAttribute(timeField) && isTimeIntervalLiteral(gap) =>
+      stream.windowAll(ProcessingTimeSessionWindows.withGap(toTime(gap)))
 
-    case EventTimeSessionGroupWindow(_, _, gap) =>
-      stream.windowAll(EventTimeSessionWindows.withGap(asTime(gap)))
+    case SessionGroupWindow(_, timeField, gap)
+        if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(gap) =>
+      stream.windowAll(EventTimeSessionWindows.withGap(toTime(gap)))
   }
 
-  def asTime(expr: Expression): Time = expr match {
-    case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => Time.milliseconds(value)
-    case _ => throw new IllegalArgumentException()
-  }
 
-  def asCount(expr: Expression): Long = expr match {
-    case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
-    case _ => throw new IllegalArgumentException()
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index b015a1d..c6c25c0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -27,9 +27,9 @@ import org.apache.calcite.rex.RexProgram
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
-import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
 
 /**
@@ -40,17 +40,25 @@ class DataStreamCalc(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     input: RelNode,
-    rowRelDataType: RelDataType,
+    inputSchema: RowSchema,
+    schema: RowSchema,
     calcProgram: RexProgram,
     ruleDescription: String)
   extends Calc(cluster, traitSet, input, calcProgram)
   with CommonCalc
   with DataStreamRel {
 
-  override def deriveRowType(): RelDataType = rowRelDataType
+  override def deriveRowType(): RelDataType = schema.logicalType
 
   override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
-    new DataStreamCalc(cluster, traitSet, child, getRowType, program, ruleDescription)
+    new DataStreamCalc(
+      cluster,
+      traitSet,
+      child,
+      inputSchema,
+      schema,
+      program,
+      ruleDescription)
   }
 
   override def toString: String = calcToString(calcProgram, getExpressionString)
@@ -85,8 +93,8 @@ class DataStreamCalc(
 
     val body = functionBody(
       generator,
-      inputDataStream.getType,
-      getRowType,
+      inputSchema,
+      schema,
       calcProgram,
       config)
 
@@ -94,7 +102,7 @@ class DataStreamCalc(
       ruleDescription,
       classOf[FlatMapFunction[Row, Row]],
       body,
-      FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
+      schema.physicalTypeInfo)
 
     val mapFunc = calcMapFunction(genFunction)
     inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 342920a..8955110 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -18,7 +18,6 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.SemiJoinType
@@ -28,6 +27,7 @@ import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.CommonCorrelate
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
 
 /**
@@ -36,28 +36,30 @@ import org.apache.flink.types.Row
 class DataStreamCorrelate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
+    inputSchema: RowSchema,
     inputNode: RelNode,
     scan: FlinkLogicalTableFunctionScan,
     condition: Option[RexNode],
-    relRowType: RelDataType,
-    joinRowType: RelDataType,
+    schema: RowSchema,
+    joinSchema: RowSchema,
     joinType: SemiJoinType,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, inputNode)
   with CommonCorrelate
   with DataStreamRel {
 
-  override def deriveRowType() = relRowType
+  override def deriveRowType() = schema.logicalType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamCorrelate(
       cluster,
       traitSet,
+      inputSchema,
       inputs.get(0),
       scan,
       condition,
-      relRowType,
-      joinRowType,
+      schema,
+      joinSchema,
       joinType,
       ruleDescription)
   }
@@ -74,7 +76,7 @@ class DataStreamCorrelate(
     super.explainTerms(pw)
       .item("invocation", scan.getCall)
       .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
-      .item("rowType", relRowType)
+      .item("rowType", schema.logicalType)
       .item("joinType", joinType)
       .itemIf("condition", condition.orNull, condition.isDefined)
   }
@@ -94,16 +96,16 @@ class DataStreamCorrelate(
 
     val mapFunc = correlateMapFunction(
       config,
-      inputDS.getType,
+      inputSchema,
       udtfTypeInfo,
-      getRowType,
+      schema,
       joinType,
       rexCall,
       condition,
       Some(pojoFieldMapping),
       ruleDescription)
 
-    inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
+    inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, schema.logicalType))
   }
 
 }