You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:41 UTC

[07/15] flink git commit: [FLINK-5884] [table] Integrate time indicators for Table API & SQL. Continued

[FLINK-5884] [table] Integrate time indicators for Table API & SQL. Continued


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24bf61ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24bf61ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24bf61ce

Branch: refs/heads/master
Commit: 24bf61ceb332f2db2dc4bab624b73beffae1160a
Parents: 495f104
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu May 4 18:05:27 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat May 6 01:51:54 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |  25 +--
 .../table/api/StreamTableEnvironment.scala      |  86 ++++++++--
 .../flink/table/api/TableEnvironment.scala      |  64 +-------
 .../flink/table/calcite/FlinkTypeFactory.scala  |  17 +-
 .../table/expressions/ExpressionParser.scala    |  18 +-
 .../table/plan/logical/LogicalWindow.scala      |   2 +-
 .../flink/table/plan/logical/groupWindows.scala |  11 +-
 .../flink/table/plan/nodes/CommonCalc.scala     |  10 +-
 .../plan/nodes/PhysicalTableSourceScan.scala    |   2 +-
 .../plan/nodes/dataset/DataSetAggregate.scala   |   2 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |   5 +-
 .../datastream/DataStreamOverAggregate.scala    |   5 +-
 .../datastream/StreamTableSourceScan.scala      |  28 +---
 .../logical/FlinkLogicalTableSourceScan.scala   |  32 +++-
 .../plan/schema/StreamTableSourceTable.scala    |  65 ++++++++
 .../table/runtime/aggregate/AggregateUtil.scala |   3 +-
 .../table/sources/DefinedTimeAttributes.scala   |  47 ++++--
 .../flink/table/TableEnvironmentTest.scala      |  52 ++----
 .../api/scala/batch/table/GroupWindowTest.scala |   4 +-
 .../stream/StreamTableEnvironmentTest.scala     | 164 +++++++++++++++++++
 .../api/scala/stream/TableSourceTest.scala      | 154 +++++++++++++++++
 .../scala/stream/sql/WindowAggregateTest.scala  |  59 ++++---
 22 files changed, 628 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 3eb2ffc..02c6063 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.table.explain.PlanJsonParser
-import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAttribute}
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
@@ -196,26 +196,11 @@ abstract class BatchTableEnvironment(
 
     val (fieldNames, fieldIndexes) = getFieldInfo[T](
       dataSet.getType,
-      fields,
-      ignoreTimeAttributes = true)
+      fields)
 
-    // validate and extract time attributes
-    val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)
-
-    // don't allow proctime on batch
-    proctime match {
-      case Some(_) =>
-        throw new ValidationException(
-          "A proctime attribute is not allowed in a batch environment. " +
-            "Working with processing-time on batch would lead to non-deterministic results.")
-      case _ => // ok
-    }
-    // rowtime must not extend the schema of a batch table
-    rowtime match {
-      case Some((idx, _)) if idx >= dataSet.getType.getArity =>
-        throw new ValidationException(
-          "A rowtime attribute must be defined on an existing field in a batch environment.")
-      case _ => // ok
+    if (fields.exists(_.isInstanceOf[TimeAttribute])) {
+      throw new ValidationException(
+        ".rowtime and .proctime time indicators are not allowed in a batch environment.")
     }
 
     val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index d1f2fb5..dd2c09d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -26,19 +26,21 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{RuleSet, RuleSets}
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.calcite.RelTimeIndicatorConverter
 import org.apache.flink.table.explain.PlanJsonParser
-import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference}
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataStreamTable, TableSourceTable}
+import org.apache.flink.table.plan.schema.{DataStreamTable, StreamTableSourceTable, TableSourceTable}
 import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.typeutils.TypeCheckUtils
 import org.apache.flink.types.Row
 
 import _root_.scala.collection.JavaConverters._
@@ -99,7 +101,7 @@ abstract class StreamTableEnvironment(
 
     tableSource match {
       case streamTableSource: StreamTableSource[_] =>
-        registerTableInternal(name, new TableSourceTable(streamTableSource))
+        registerTableInternal(name, new StreamTableSourceTable(streamTableSource))
       case _ =>
         throw new TableException("Only StreamTableSource can be registered in " +
             "StreamTableEnvironment")
@@ -168,14 +170,13 @@ abstract class StreamTableEnvironment(
       fields: Array[Expression])
     : Unit = {
 
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](
-      dataStream.getType,
-      fields,
-      ignoreTimeAttributes = false)
+    val streamType = dataStream.getType
 
-    // validate and extract time attributes
-    val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)
+    // get field names and types for all non-replaced fields
+    val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType, fields)
 
+    // validate and extract time attributes
+    val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, fields)
 
     val dataStreamTable = new DataStreamTable[T](
       dataStream,
@@ -188,6 +189,71 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+    * Checks for at most one rowtime and proctime attribute.
+    * Returns the time attributes.
+    *
+    * @return rowtime attribute and proctime attribute
+    */
+  private def validateAndExtractTimeAttributes(
+    streamType: TypeInformation[_],
+    exprs: Array[Expression])
+  : (Option[(Int, String)], Option[(Int, String)]) = {
+
+    val fieldTypes: Array[TypeInformation[_]] = streamType match {
+      case c: CompositeType[_] => (0 until c.getArity).map(i => c.getTypeAt(i)).toArray
+      case a: AtomicType[_] => Array(a)
+    }
+
+    var fieldNames: List[String] = Nil
+    var rowtime: Option[(Int, String)] = None
+    var proctime: Option[(Int, String)] = None
+
+    exprs.zipWithIndex.foreach {
+      case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
+        if (rowtime.isDefined) {
+          throw new TableException(
+            "The rowtime attribute can only be defined once in a table schema.")
+        } else {
+          // check type of field that is replaced
+          if (idx < fieldTypes.length &&
+            !(TypeCheckUtils.isLong(fieldTypes(idx)) ||
+              TypeCheckUtils.isTimePoint(fieldTypes(idx)))) {
+            throw new TableException(
+              "The rowtime attribute can only be replace a field with a valid time type, such as " +
+                "Timestamp or Long.")
+          }
+          rowtime = Some(idx, name)
+        }
+      case (ProctimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
+        if (proctime.isDefined) {
+          throw new TableException(
+            "The proctime attribute can only be defined once in a table schema.")
+        } else {
+          // check that proctime is only appended
+          if (idx < fieldTypes.length) {
+            throw new TableException(
+              "The proctime attribute can only be appended to the table schema and not replace " +
+                "an existing field. Please move it to the end of the schema.")
+          }
+          proctime = Some(idx, name)
+        }
+      case (u: UnresolvedFieldReference, _) => fieldNames = u.name :: fieldNames
+    }
+
+    if (rowtime.isDefined && fieldNames.contains(rowtime.get._2)) {
+      throw new TableException(
+        "The rowtime attribute may not have the same name as an another field.")
+    }
+
+    if (proctime.isDefined && fieldNames.contains(proctime.get._2)) {
+      throw new TableException(
+        "The proctime attribute may not have the same name as an another field.")
+    }
+
+    (rowtime, proctime)
+  }
+
+  /**
     * Returns the decoration rule set for this environment
     * including a custom RuleSet configuration.
     */

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 4c72e8f..9ed5000 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -601,50 +601,36 @@ abstract class TableEnvironment(val config: TableConfig) {
 
   /**
     * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
-    * [[Expression]]. It does not handle time attributes but considers them in indices, if
-    * ignore flag is not false.
+    * [[Expression]]. It does not handle time attributes but considers them in indices.
     *
     * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
-    * @param exprs The expressions that define the field names.
-    * @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions.
+    * @param exprs     The expressions that define the field names.
     * @tparam A The type of the TypeInformation.
     * @return A tuple of two arrays holding the field names and corresponding field positions.
     */
   protected[flink] def getFieldInfo[A](
       inputType: TypeInformation[A],
-      exprs: Array[Expression],
-      ignoreTimeAttributes: Boolean)
+      exprs: Array[Expression])
     : (Array[String], Array[Int]) = {
 
     TableEnvironment.validateType(inputType)
 
-    val filteredExprs = if (ignoreTimeAttributes) {
-        exprs.map {
-          case ta: TimeAttribute => ta.expression
-          case e@_ => e
-        }
-    } else {
-      exprs
-    }
-
     val indexedNames: Array[(Int, String)] = inputType match {
       case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
         throw new TableException(
           "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
             "Please specify the type of the input with a RowTypeInfo.")
       case a: AtomicType[A] =>
-        filteredExprs.zipWithIndex flatMap {
+        exprs.zipWithIndex flatMap {
           case (UnresolvedFieldReference(name), idx) =>
             if (idx > 0) {
               throw new TableException("Table of atomic type can only have a single field.")
             }
             Some((0, name))
-          case (_: TimeAttribute, _) if ignoreTimeAttributes =>
-            None
           case _ => throw new TableException("Field reference expression requested.")
         }
       case t: TupleTypeInfo[A] =>
-        filteredExprs.zipWithIndex flatMap {
+        exprs.zipWithIndex flatMap {
           case (UnresolvedFieldReference(name), idx) =>
             Some((idx, name))
           case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
@@ -659,7 +645,7 @@ abstract class TableEnvironment(val config: TableConfig) {
             "Field reference expression or alias on field expression expected.")
         }
       case c: CaseClassTypeInfo[A] =>
-        filteredExprs.zipWithIndex flatMap {
+        exprs.zipWithIndex flatMap {
           case (UnresolvedFieldReference(name), idx) =>
             Some((idx, name))
           case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
@@ -674,7 +660,7 @@ abstract class TableEnvironment(val config: TableConfig) {
             "Field reference expression or alias on field expression expected.")
         }
       case p: PojoTypeInfo[A] =>
-        filteredExprs flatMap {
+        exprs flatMap {
           case (UnresolvedFieldReference(name)) =>
             val idx = p.getFieldIndex(name)
             if (idx < 0) {
@@ -822,42 +808,6 @@ abstract class TableEnvironment(val config: TableConfig) {
     Some(mapFunction)
   }
 
-  /**
-    * Checks for at most one rowtime and proctime attribute.
-    * Returns the time attributes.
-    *
-    * @return rowtime attribute and proctime attribute
-    */
-  protected def validateAndExtractTimeAttributes(
-      fieldNames: Seq[String],
-      fieldIndices: Seq[Int],
-      exprs: Array[Expression])
-    : (Option[(Int, String)], Option[(Int, String)]) = {
-
-    var rowtime: Option[(Int, String)] = None
-    var proctime: Option[(Int, String)] = None
-
-    exprs.zipWithIndex.foreach {
-      case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
-        if (rowtime.isDefined) {
-          throw new TableException(
-            "The rowtime attribute can only be defined once in a table schema.")
-        } else {
-          rowtime = Some(idx, name)
-        }
-      case (ProctimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
-        if (proctime.isDefined) {
-          throw new TableException(
-            "The proctime attribute can only be defined once in a table schema.")
-        } else {
-          proctime = Some(idx, name)
-        }
-      case _ =>
-        // do nothing
-    }
-
-    (rowtime, proctime)
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 001011b..9281ad8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -65,11 +65,12 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
           createSqlIntervalType(
             new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
 
-        case TimeIndicatorTypeInfo.ROWTIME_INDICATOR =>
-          createRowtimeIndicatorType()
-
-        case TimeIndicatorTypeInfo.PROCTIME_INDICATOR =>
-          createProctimeIndicatorType()
+        case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
+          if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
+            createRowtimeIndicatorType()
+          } else {
+            createProctimeIndicatorType()
+          }
 
         case _ =>
           createSqlType(sqlType)
@@ -114,9 +115,11 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
     * @param fieldNames field names
     * @param fieldTypes field types, every element is Flink's [[TypeInformation]]
     * @param rowtime optional system field to indicate event-time; the index determines the index
-    *                in the final record and might replace an existing field
+    *                in the final record. If the index is smaller than the number of specified
+    *                fields, it shifts all following fields.
     * @param proctime optional system field to indicate processing-time; the index determines the
-    *                 index in the final record and might replace an existing field
+    *                 index in the final record. If the index is smaller than the number of
+    *                 specified fields, it shifts all following fields.
     * @return a struct type with the input fieldNames, input fieldTypes, and system fields
     */
   def buildLogicalRowType(

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index c33f8fc..98580ba 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -98,11 +98,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val UNBOUNDED_ROW: Keyword = Keyword("unbounded_row")
   lazy val UNBOUNDED_RANGE: Keyword = Keyword("unbounded_range")
   lazy val ASIN: Keyword = Keyword("asin")
+  lazy val ROWTIME: Keyword = Keyword("rowtime")
+  lazy val PROCTIME: Keyword = Keyword("proctime")
 
   def functionIdent: ExpressionParser.Parser[String] =
     not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
       not(SUM0) ~ not(STDDEV_POP) ~ not(STDDEV_SAMP) ~ not(VAR_POP) ~ not(VAR_SAMP) ~
       not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~ not(IF) ~
+      not(ROWTIME) ~ not(PROCTIME) ~
       not(CURRENT_ROW) ~ not(UNBOUNDED_ROW) ~ not(CURRENT_RANGE) ~ not(UNBOUNDED_RANGE) ~>
       super.ident
 
@@ -532,12 +535,25 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
 
   // alias
 
-  lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
+  lazy val alias: PackratParser[Expression] = timeIndicator |
+    logic ~ AS ~ fieldReference ^^ {
       case e ~ _ ~ name => Alias(e, name.name)
   } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
     case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.tail.map(_.name))
   } | logic
 
+  // time indicators
+
+  lazy val timeIndicator: PackratParser[Expression] = procTime | rowTime
+
+  lazy val procTime: PackratParser[Expression] = fieldReference ~ "." ~ PROCTIME ^^ {
+    case f ~ _ ~ _ => ProctimeAttribute(f)
+  }
+
+  lazy val rowTime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME ^^ {
+    case f ~ _ ~ _ => RowtimeAttribute(f)
+  }
+
   lazy val expression: PackratParser[Expression] = alias |
     failure("Invalid expression.")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
index 92dc501..6161ef0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.expressions.{Expression, WindowReference}
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 
 /**
-  * Logical super class for all types of windows (group-windows and row-windows).
+  * Logical super class for group windows.
   *
   * @param aliasAttribute window alias
   * @param timeAttribute time field indicating event-time or processing-time

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
index 3e5de28..4a8fb52 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.logical
 import org.apache.flink.table.api.{BatchTableEnvironment, StreamTableEnvironment, TableEnvironment}
 import org.apache.flink.table.expressions.ExpressionUtils.{isRowCountLiteral, isRowtimeAttribute, isTimeAttribute, isTimeIntervalLiteral}
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.typeutils.TypeCheckUtils.isTimePoint
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isTimePoint, isLong}
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 
 // ------------------------------------------------------------------------------------------------
@@ -56,7 +56,8 @@ case class TumblingGroupWindow(
         case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
           ValidationFailure(
             "Tumbling window expects a time attribute for grouping in a stream environment.")
-        case _: BatchTableEnvironment if isTimePoint(size.resultType) =>
+        case _: BatchTableEnvironment
+          if !(isTimePoint(timeField.resultType) || isLong(timeField.resultType)) =>
           ValidationFailure(
             "Tumbling window expects a time attribute for grouping in a stream environment.")
 
@@ -119,7 +120,8 @@ case class SlidingGroupWindow(
         case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
           ValidationFailure(
             "Sliding window expects a time attribute for grouping in a stream environment.")
-        case _: BatchTableEnvironment if isTimePoint(size.resultType) =>
+        case _: BatchTableEnvironment
+          if !(isTimePoint(timeField.resultType) || isLong(timeField.resultType)) =>
           ValidationFailure(
             "Sliding window expects a time attribute for grouping in a stream environment.")
 
@@ -169,7 +171,8 @@ case class SessionGroupWindow(
         case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
           ValidationFailure(
             "Session window expects a time attribute for grouping in a stream environment.")
-        case _: BatchTableEnvironment if isTimePoint(gap.resultType) =>
+        case _: BatchTableEnvironment
+          if !(isTimePoint(timeField.resultType) || isLong(timeField.resultType)) =>
           ValidationFailure(
             "Session window expects a time attribute for grouping in a stream environment.")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 5c35129..ff5ffb2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -34,11 +34,11 @@ import scala.collection.JavaConverters._
 trait CommonCalc {
 
   private[flink] def functionBody(
-     generator: CodeGenerator,
-     inputSchema: RowSchema,
-     returnSchema: RowSchema,
-     calcProgram: RexProgram,
-     config: TableConfig)
+      generator: CodeGenerator,
+      inputSchema: RowSchema,
+      returnSchema: RowSchema,
+      calcProgram: RexProgram,
+      config: TableConfig)
     : String = {
 
     val expandedExpressions = calcProgram

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
index c18c3d1..dc7a0d6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
@@ -46,7 +46,7 @@ abstract class PhysicalTableSourceScan(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     val terms = super.explainTerms(pw)
-        .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
+        .item("fields", deriveRowType().getFieldNames.asScala.mkString(", "))
 
     val sourceDesc = tableSource.explainSource()
     if (sourceDesc.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
index c22dc54..b53081c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -90,8 +90,8 @@ class DataSetAggregate(
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
-    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
     val input = inputNode.asInstanceOf[DataSetRel]
+    val inputDS = input.translateToPlan(tableEnv)
 
     val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index 9e18082..5274fa1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -103,10 +103,7 @@ class DataSetCalc(
       body,
       rowTypeInfo)
 
-    val runner = new FlatMapRunner[Row, Row](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
+    val runner = calcMapFunction(genFunction)
 
     inputDS.flatMap(runner).name(calcOpName(calcProgram, getExpressionString))
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 8eb9d40..db31f32 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -241,10 +241,7 @@ class DataStreamOverAggregate(
     }
 
     val precedingOffset =
-      getLowerBoundary(
-        logicWindow,
-        overWindow,
-        input) + (if (isRowsClause) 1 else 0)
+      getLowerBoundary(logicWindow, overWindow, input) + (if (isRowsClause) 1 else 0)
 
     val processFunction = AggregateUtil.createBoundedOverProcessFunction(
       generator,

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 5dc3da8..e34e416 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.{RowSchema, TableSourceTable}
-import org.apache.flink.table.sources.{DefinedTimeAttributes, StreamTableSource, TableSource}
+import org.apache.flink.table.sources._
 import org.apache.flink.types.Row
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
@@ -41,35 +41,23 @@ class StreamTableSourceScan(
   override def deriveRowType() = {
     val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
 
-    def removeIndex[T](idx: Int, l: List[T]): List[T] = {
-      if (l.size < idx) {
-        l
-      } else {
-        l.take(idx) ++ l.drop(idx + 1)
-      }
-    }
+    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
+    val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
 
-    var fieldNames = TableEnvironment.getFieldNames(tableSource).toList
-    var fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
+    val fieldCnt = fieldNames.length
 
     val rowtime = tableSource match {
-      case timeSource: DefinedTimeAttributes if timeSource.getRowtimeAttribute != null =>
+      case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null =>
         val rowtimeAttribute = timeSource.getRowtimeAttribute
-        // remove physical field if it is overwritten by time attribute
-        fieldNames = removeIndex(rowtimeAttribute.f0, fieldNames)
-        fieldTypes = removeIndex(rowtimeAttribute.f0, fieldTypes)
-        Some((rowtimeAttribute.f0, rowtimeAttribute.f1))
+        Some((fieldCnt, rowtimeAttribute))
       case _ =>
         None
     }
 
     val proctime = tableSource match {
-      case timeSource: DefinedTimeAttributes if timeSource.getProctimeAttribute != null =>
+      case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null =>
         val proctimeAttribute = timeSource.getProctimeAttribute
-        // remove physical field if it is overwritten by time attribute
-        fieldNames = removeIndex(proctimeAttribute.f0, fieldNames)
-        fieldTypes = removeIndex(proctimeAttribute.f0, fieldTypes)
-        Some((proctimeAttribute.f0, proctimeAttribute.f1))
+        Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
       case _ =>
         None
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index 53e7b31..a2777ec 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, TableSource}
 
 import scala.collection.JavaConverters._
 
@@ -47,11 +47,33 @@ class FlinkLogicalTableSourceScan(
 
   override def deriveRowType(): RelDataType = {
     val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
+    val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
+
+    val fieldCnt = fieldNames.length
+
+    val rowtime = tableSource match {
+      case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null =>
+        val rowtimeAttribute = timeSource.getRowtimeAttribute
+        Some((fieldCnt, rowtimeAttribute))
+      case _ =>
+        None
+    }
+
+    val proctime = tableSource match {
+      case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null =>
+        val proctimeAttribute = timeSource.getProctimeAttribute
+        Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
+      case _ =>
+        None
+    }
+
     flinkTypeFactory.buildLogicalRowType(
-      TableEnvironment.getFieldNames(tableSource),
-      TableEnvironment.getFieldTypes(tableSource.getReturnType),
-      None,
-      None)
+      fieldNames,
+      fieldTypes,
+      rowtime,
+      proctime)
   }
 
   override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
new file mode 100644
index 0000000..75deca5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, TableSource}
+
+class StreamTableSourceTable[T](
+    override val tableSource: TableSource[T],
+    override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+  extends TableSourceTable[T](tableSource, statistic) {
+
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+
+    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
+    val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
+
+    val fieldCnt = fieldNames.length
+
+    val rowtime = tableSource match {
+      case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null =>
+        val rowtimeAttribute = timeSource.getRowtimeAttribute
+        Some((fieldCnt, rowtimeAttribute))
+      case _ =>
+        None
+    }
+
+    val proctime = tableSource match {
+      case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null =>
+        val proctimeAttribute = timeSource.getProctimeAttribute
+        Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
+      case _ =>
+        None
+    }
+
+    flinkTypeFactory.buildLogicalRowType(
+      fieldNames,
+      fieldTypes,
+      rowtime,
+      proctime)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 07992cd..dfed34a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -81,7 +81,6 @@ object AggregateUtil {
       isRowsClause: Boolean)
     : ProcessFunction[Row, Row] = {
 
-    val needRetract = false
     val (aggFields, aggregates) =
       transformToAggregateFunctions(
         namedAggregates.map(_.getKey),
@@ -107,7 +106,7 @@ object AggregateUtil {
       None,
       None,
       outputArity,
-      needRetract,
+      needRetract = false,
       needMerge = false,
       needReset = false
     )

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
index 8466cdf..6d87663 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
@@ -18,30 +18,43 @@
 
 package org.apache.flink.table.sources
 
-import org.apache.flink.api.java.tuple.Tuple2
-
 /**
-  * Defines logical time attributes for a [[TableSource]]. Time attributes can be used for
-  * indicating, accessing, and working with Flink's event-time or processing-time. A
-  * [[TableSource]] that implements this interface can define names and positions of rowtime
-  * and proctime attributes in the rows it produces.
+  * Defines a logical event-time attribute for a [[TableSource]].
+  * The event-time attribute can be used for indicating, accessing, and working with Flink's
+  * event-time.
+  *
+  * A [[TableSource]] that implements this interface defines the name of
+  * the event-time attribute. The attribute will be added to the schema of the
+  * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
   */
-trait DefinedTimeAttributes {
+trait DefinedRowTimeAttribute {
 
   /**
-    * Defines a name and position (starting at 0) of rowtime attribute that represents Flink's
-    * event-time. Null if no rowtime should be available. If the position is within the arity of
-    * the result row, the logical attribute will overwrite the physical attribute. If the position
-    * is higher than the result row, the time attribute will be appended logically.
+    * Defines a name of the event-time attribute that represents Flink's
+    * event-time. Null if no rowtime should be available.
+    *
+    * The field will be appended to the schema provided by the [[TableSource]].
     */
-  def getRowtimeAttribute: Tuple2[Int, String]
+  def getRowtimeAttribute: String
+}
+
+/**
+  * Defines a logical processing-time attribute for a [[TableSource]].
+  * The processing-time attribute can be used for indicating, accessing, and working with Flink's
+  * processing-time.
+  *
+  * A [[TableSource]] that implements this interface defines the name of
+  * the processing-time attribute. The attribute will be added to the schema of the
+  * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
+  */
+trait DefinedProcTimeAttribute {
 
   /**
-    * Defines a name and position (starting at 0) of proctime attribute that represents Flink's
-    * processing-time. Null if no proctime should be available. If the position is within the arity
-    * of the result row, the logical attribute will overwrite the physical attribute. If the
-    * position is higher than the result row, the time attribute will be appended logically.
+    * Defines a name of the processing-time attribute that represents Flink's
+    * processing-time. Null if no rowtime should be available.
+    *
+    * The field will be appended to the schema provided by the [[TableSource]].
     */
-  def getProctimeAttribute: Tuple2[Int, String]
+  def getProctimeAttribute: String
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index faacc54..5247685 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -93,8 +93,7 @@ class TableEnvironmentTest extends TableTestBase {
         UnresolvedFieldReference("name1"),
         UnresolvedFieldReference("name2"),
         UnresolvedFieldReference("name3")
-      ),
-      ignoreTimeAttributes = true)
+      ))
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -108,8 +107,7 @@ class TableEnvironmentTest extends TableTestBase {
         UnresolvedFieldReference("name1"),
         UnresolvedFieldReference("name2"),
         UnresolvedFieldReference("name3")
-      ),
-      ignoreTimeAttributes = true)
+      ))
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -123,8 +121,7 @@ class TableEnvironmentTest extends TableTestBase {
         UnresolvedFieldReference("name1"),
         UnresolvedFieldReference("name2"),
         UnresolvedFieldReference("name3")
-      ),
-      ignoreTimeAttributes = true)
+      ))
   }
 
   @Test
@@ -135,8 +132,7 @@ class TableEnvironmentTest extends TableTestBase {
         UnresolvedFieldReference("pf3"),
         UnresolvedFieldReference("pf1"),
         UnresolvedFieldReference("pf2")
-      ),
-      ignoreTimeAttributes = true)
+      ))
 
     fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -146,9 +142,7 @@ class TableEnvironmentTest extends TableTestBase {
   def testGetFieldInfoAtomicName1(): Unit = {
     val fieldInfo = tEnv.getFieldInfo(
       atomicType,
-      Array(UnresolvedFieldReference("name")),
-      ignoreTimeAttributes = true
-    )
+      Array(UnresolvedFieldReference("name")))
 
     fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
@@ -161,8 +155,7 @@ class TableEnvironmentTest extends TableTestBase {
       Array(
         UnresolvedFieldReference("name1"),
         UnresolvedFieldReference("name2")
-      ),
-      ignoreTimeAttributes = true)
+      ))
   }
 
   @Test
@@ -173,8 +166,7 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("f0"), "name1"),
         Alias(UnresolvedFieldReference("f1"), "name2"),
         Alias(UnresolvedFieldReference("f2"), "name3")
-      ),
-      ignoreTimeAttributes = true)
+      ))
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -188,8 +180,7 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("f2"), "name1"),
         Alias(UnresolvedFieldReference("f0"), "name2"),
         Alias(UnresolvedFieldReference("f1"), "name3")
-      ),
-      ignoreTimeAttributes = true)
+      ))
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -203,8 +194,7 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("xxx"), "name1"),
         Alias(UnresolvedFieldReference("yyy"), "name2"),
         Alias(UnresolvedFieldReference("zzz"), "name3")
-      ),
-      ignoreTimeAttributes = true)
+      ))
   }
 
   @Test
@@ -215,8 +205,7 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("cf1"), "name1"),
         Alias(UnresolvedFieldReference("cf2"), "name2"),
         Alias(UnresolvedFieldReference("cf3"), "name3")
-      ),
-      ignoreTimeAttributes = true)
+      ))
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -230,8 +219,7 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("cf3"), "name1"),
         Alias(UnresolvedFieldReference("cf1"), "name2"),
         Alias(UnresolvedFieldReference("cf2"), "name3")
-      ),
-      ignoreTimeAttributes = true)
+      ))
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -245,8 +233,7 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("xxx"), "name1"),
         Alias(UnresolvedFieldReference("yyy"), "name2"),
         Alias(UnresolvedFieldReference("zzz"), "name3")
-      ),
-      ignoreTimeAttributes = true)
+      ))
   }
 
   @Test
@@ -257,8 +244,7 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("pf1"), "name1"),
         Alias(UnresolvedFieldReference("pf2"), "name2"),
         Alias(UnresolvedFieldReference("pf3"), "name3")
-      ),
-      ignoreTimeAttributes = true)
+      ))
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -272,8 +258,7 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("pf3"), "name1"),
         Alias(UnresolvedFieldReference("pf1"), "name2"),
         Alias(UnresolvedFieldReference("pf2"), "name3")
-      ),
-      ignoreTimeAttributes = true)
+      ))
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -287,8 +272,7 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("xxx"), "name1"),
         Alias(UnresolvedFieldReference("yyy"), "name2"),
         Alias(UnresolvedFieldReference("zzz"), "name3")
-      ),
-      ignoreTimeAttributes = true)
+      ))
   }
 
   @Test(expected = classOf[TableException])
@@ -297,16 +281,14 @@ class TableEnvironmentTest extends TableTestBase {
       atomicType,
       Array(
         Alias(UnresolvedFieldReference("name1"), "name2")
-      ),
-      ignoreTimeAttributes = true)
+      ))
   }
 
   @Test(expected = classOf[TableException])
   def testGetFieldInfoGenericRowAlias(): Unit = {
     tEnv.getFieldInfo(
       genericRowType,
-      Array(UnresolvedFieldReference("first")),
-      ignoreTimeAttributes = true)
+      Array(UnresolvedFieldReference("first")))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
index c481105..aa6edd3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
@@ -100,7 +100,7 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testEventTimeTumblingGroupWindowOverCount(): Unit = {
     val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
       .window(Tumble over 2.rows on 'long as 'w)
@@ -144,7 +144,7 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testEventTimeTumblingGroupWindowOverTime(): Unit = {
     val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
 
     val windowedTable = table
       .window(Tumble over 5.milli on 'long as 'w)

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
new file mode 100644
index 0000000..e9384c7
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import java.lang.{Integer => JInt, Long => JLong}
+import java.util.Collections
+import java.util.{List => JList}
+
+import org.apache.flink.api.java.tuple.{Tuple5 => JTuple5}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecEnv}
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
+import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+import org.mockito.Mockito.{mock, when}
+
+class StreamTableEnvironmentTest extends TableTestBase {
+
+  @Test(expected = classOf[TableException])
+  def testInvalidProctimeAttribute(): Unit = {
+    val util = streamTestUtil()
+    // cannot replace an attribute with proctime
+    util.addTable[(Long, Int, String, Int, Long)]('a, 'b.proctime, 'c, 'd, 'e)
+  }
+
+  @Test
+  def testProctimeAttribute(): Unit = {
+    val util = streamTestUtil()
+    // cannot replace an attribute with proctime
+    util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt.proctime)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRowtimeAttributeReplaceFieldOfInvalidType(): Unit = {
+    val util = streamTestUtil()
+    // cannot replace a non-time attribute with rowtime
+    util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c.rowtime, 'd, 'e)
+  }
+
+  @Test
+  def testReplacedRowtimeAttribute(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'e)
+  }
+
+  @Test
+  def testAppendedRowtimeAttribute(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rt.rowtime)
+  }
+
+  @Test
+  def testRowtimeAndProctimeAttribute1(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rt.rowtime, 'pt.proctime)
+  }
+
+  @Test
+  def testRowtimeAndProctimeAttribute2(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt.proctime, 'rt.rowtime)
+  }
+
+  @Test
+  def testRowtimeAndProctimeAttribute3(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'e, 'pt.proctime)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRowtimeAndInvalidProctimeAttribute(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'pt.proctime)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testOnlyOneRowtimeAttribute1(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Int, String, Int, Long)]('a.rowtime, 'b, 'c, 'd, 'e, 'rt.rowtime)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testOnlyOneProctimeAttribute1(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt1.proctime, 'pt2.proctime)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRowtimeAttributeUsedName(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'a.rowtime)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testProctimeAttributeUsedName(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'b.proctime)
+  }
+
+  @Test
+  def testProctimeAttributeParsed(): Unit = {
+    val (jTEnv, ds) = prepareSchemaExpressionParser
+    jTEnv.fromDataStream(ds, "a, b, c, d, e, pt.proctime")
+  }
+
+  @Test
+  def testReplacingRowtimeAttributeParsed(): Unit = {
+    val (jTEnv, ds) = prepareSchemaExpressionParser
+    jTEnv.fromDataStream(ds, "a.rowtime, b, c, d, e")
+  }
+
+  @Test
+  def testAppedingRowtimeAttributeParsed(): Unit = {
+    val (jTEnv, ds) = prepareSchemaExpressionParser
+    jTEnv.fromDataStream(ds, "a, b, c, d, e, rt.rowtime")
+  }
+
+  @Test
+  def testRowtimeAndProctimeAttributeParsed1(): Unit = {
+    val (jTEnv, ds) = prepareSchemaExpressionParser
+    jTEnv.fromDataStream(ds, "a, b, c, d, e, pt.proctime, rt.rowtime")
+  }
+
+  @Test
+  def testRowtimeAndProctimeAttributeParsed2(): Unit = {
+    val (jTEnv, ds) = prepareSchemaExpressionParser
+    jTEnv.fromDataStream(ds, "rt.rowtime, b, c, d, e, pt.proctime")
+  }
+
+  private def prepareSchemaExpressionParser:
+    (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
+
+    val jTEnv = TableEnvironment.getTableEnvironment(mock(classOf[JStreamExecEnv]))
+
+    val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
+      .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
+    val ds = mock(classOf[DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]])
+    when(ds.getType).thenReturn(sType)
+
+    (jTEnv, ds)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
new file mode 100644
index 0000000..7673266
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, StreamTableSource}
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil.{term, unaryNode}
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class TableSourceTest extends TableTestBase {
+
+  @Test
+  def testRowTimeTableSourceSimple(): Unit = {
+    val util = streamTestUtil()
+    util.tEnv.registerTableSource("rowTimeT", new TestRowTimeSource("addTime"))
+
+    val t = util.tEnv.scan("rowTimeT").select("addTime, id, name, val")
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])",
+        term("select", "addTime", "id", "name", "val")
+      )
+    util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testRowTimeTableSourceGroupWindow(): Unit = {
+    val util = streamTestUtil()
+    util.tEnv.registerTableSource("rowTimeT", new TestRowTimeSource("addTime"))
+
+    val t = util.tEnv.scan("rowTimeT")
+      .filter("val > 100")
+      .window(Tumble over 10.minutes on 'addTime as 'w)
+      .groupBy('name, 'w)
+      .select('name, 'w.end, 'val.avg)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])",
+            term("select", "name", "val", "addTime"),
+            term("where", ">(val, 100)")
+          ),
+          term("groupBy", "name"),
+          term("window", "TumblingGroupWindow(WindowReference(w), 'addTime, 600000.millis)"),
+          term("select", "name", "AVG(val) AS TMP_1", "end(WindowReference(w)) AS TMP_0")
+        ),
+        term("select", "name", "TMP_0", "TMP_1")
+      )
+    util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testProcTimeTableSourceSimple(): Unit = {
+    val util = streamTestUtil()
+    util.tEnv.registerTableSource("procTimeT", new TestProcTimeSource("pTime"))
+
+    val t = util.tEnv.scan("procTimeT").select("pTime, id, name, val")
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
+        term("select", "pTime", "id", "name", "val")
+      )
+    util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testProcTimeTableSourceOverWindow(): Unit = {
+    val util = streamTestUtil()
+    util.tEnv.registerTableSource("procTimeT", new TestProcTimeSource("pTime"))
+
+    val t = util.tEnv.scan("procTimeT")
+      .window(Over partitionBy 'id orderBy 'pTime preceding 2.hours as 'w)
+      .select('id, 'name, 'val.sum over 'w as 'valSum)
+      .filter('valSum > 100)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
+          term("partitionBy", "id"),
+          term("orderBy", "pTime"),
+          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
+          term("select", "id", "val", "name", "pTime", "SUM(val) AS w0$o0")
+        ),
+        term("select", "id", "name", "w0$o0 AS valSum"),
+        term("where", ">(w0$o0, 100)")
+      )
+    util.verifyTable(t, expected)
+  }
+}
+
+class TestRowTimeSource(timeField: String)
+    extends StreamTableSource[Row] with DefinedRowTimeAttribute {
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
+
+  override def getRowtimeAttribute: String = timeField
+
+  override def getReturnType: TypeInformation[Row] = {
+    new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "val", "name"))
+  }
+}
+
+class TestProcTimeSource(timeField: String)
+    extends StreamTableSource[Row] with DefinedProcTimeAttribute {
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
+
+  override def getProctimeAttribute: String = timeField
+
+  override def getReturnType: TypeInformation[Row] = {
+    new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "val", "name"))
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index edf7b1d..f84ae3d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
 class WindowAggregateTest extends TableTestBase {
   private val streamUtil: StreamTableTestUtil = streamTestUtil()
@@ -85,7 +85,6 @@ class WindowAggregateTest extends TableTestBase {
   }
 
   @Test
-  @Ignore // TODO enable once CALCITE-1761 is fixed
   def testTumbleFunction() = {
     streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
 
@@ -98,24 +97,23 @@ class WindowAggregateTest extends TableTestBase {
         "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
     val expected =
       unaryNode(
-        "DataStreamCalc",
+        "DataStreamAggregate",
         unaryNode(
-          "DataStreamAggregate",
+          "DataStreamCalc",
           streamTableNode(0),
-          term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-          term("select",
-            "COUNT(*) AS EXPR$0, " +
-              "weightedAvg(c, a) AS wAvg, " +
-              "start('w$) AS w$start, " +
-              "end('w$) AS w$end")
+          term("select", "1970-01-01 00:00:00 AS $f0", "c", "a")
         ),
-        term("select", "EXPR$0, wAvg, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
+        term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+        term("select",
+          "COUNT(*) AS EXPR$0, " +
+            "weightedAvg(c, a) AS wAvg, " +
+            "start('w$) AS w$start, " +
+            "end('w$) AS w$end")
       )
     streamUtil.verifySql(sql, expected)
   }
 
   @Test
-  @Ignore // TODO enable once CALCITE-1761 is fixed
   def testHoppingFunction() = {
     streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
 
@@ -127,24 +125,23 @@ class WindowAggregateTest extends TableTestBase {
         "GROUP BY HOP(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
     val expected =
       unaryNode(
-        "DataStreamCalc",
+        "DataStreamAggregate",
         unaryNode(
-          "DataStreamAggregate",
+          "DataStreamCalc",
           streamTableNode(0),
-          term("window", SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 900000.millis)),
-          term("select",
-            "COUNT(*) AS EXPR$0, " +
-              "weightedAvg(c, a) AS wAvg, " +
-              "start('w$) AS w$start, " +
-              "end('w$) AS w$end")
+          term("select", "1970-01-01 00:00:00 AS $f0", "c", "a")
         ),
-        term("select", "EXPR$0, wAvg, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
+        term("window", SlidingGroupWindow('w$, 'proctime, 3600000.millis, 900000.millis)),
+        term("select",
+          "COUNT(*) AS EXPR$0, " +
+            "weightedAvg(c, a) AS wAvg, " +
+            "start('w$) AS w$start, " +
+            "end('w$) AS w$end")
       )
     streamUtil.verifySql(sql, expected)
   }
 
   @Test
-  @Ignore // TODO enable once CALCITE-1761 is fixed
   def testSessionFunction() = {
     streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
 
@@ -157,18 +154,18 @@ class WindowAggregateTest extends TableTestBase {
         "GROUP BY SESSION(proctime, INTERVAL '15' MINUTE)"
     val expected =
       unaryNode(
-        "DataStreamCalc",
+        "DataStreamAggregate",
         unaryNode(
-          "DataStreamAggregate",
+          "DataStreamCalc",
           streamTableNode(0),
-          term("window", SessionGroupWindow('w$, 'rowtime, 900000.millis)),
-          term("select",
-            "COUNT(*) AS EXPR$0, " +
-              "weightedAvg(c, a) AS wAvg, " +
-              "start('w$) AS w$start, " +
-              "end('w$) AS w$end")
+          term("select", "1970-01-01 00:00:00 AS $f0", "c", "a")
         ),
-        term("select", "EXPR$0, wAvg, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
+        term("window", SessionGroupWindow('w$, 'proctime, 900000.millis)),
+        term("select",
+          "COUNT(*) AS EXPR$0, " +
+            "weightedAvg(c, a) AS wAvg, " +
+            "start('w$) AS w$start, " +
+            "end('w$) AS w$end")
       )
     streamUtil.verifySql(sql, expected)
   }