You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:36 UTC

[02/15] flink git commit: [FLINK-5884] [table] Integrate time indicators for Table API & SQL

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 63dc1ae..31ad558 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -24,7 +24,7 @@ import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.utils.{AggSqlFunction, ScalarSqlFunction, TableSqlFunction}
-import org.apache.flink.table.functions.{AggregateFunction, EventTimeExtractor, RowTime, ScalarFunction, TableFunction, _}
+import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -242,15 +242,11 @@ object FunctionCatalog {
     // array
     "cardinality" -> classOf[ArrayCardinality],
     "at" -> classOf[ArrayElementAt],
-    "element" -> classOf[ArrayElement],
+    "element" -> classOf[ArrayElement]
 
     // TODO implement function overloading here
     // "floor" -> classOf[TemporalFloor]
     // "ceil" -> classOf[TemporalCeil]
-
-    // extensions to support streaming query
-    "rowtime" -> classOf[RowTime],
-    "proctime" -> classOf[ProcTime]
   )
 
   /**
@@ -392,8 +388,6 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.ROUND,
     SqlStdOperatorTable.PI,
     // EXTENSIONS
-    EventTimeExtractor,
-    ProcTimeExtractor,
     SqlStdOperatorTable.TUMBLE,
     SqlStdOperatorTable.TUMBLE_START,
     SqlStdOperatorTable.TUMBLE_END,

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index cab3855..81c60b4 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -406,15 +406,6 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
 	}
 
 	@Test(expected = TableException.class)
-	public void testAsWithToFewFields() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail. Not enough field names specified.
-		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
-	}
-
-	@Test(expected = TableException.class)
 	public void testAsWithToManyFields() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index 9939a9c..faacc54 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -93,7 +93,8 @@ class TableEnvironmentTest extends TableTestBase {
         UnresolvedFieldReference("name1"),
         UnresolvedFieldReference("name2"),
         UnresolvedFieldReference("name3")
-    ))
+      ),
+      ignoreTimeAttributes = true)
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -107,7 +108,8 @@ class TableEnvironmentTest extends TableTestBase {
         UnresolvedFieldReference("name1"),
         UnresolvedFieldReference("name2"),
         UnresolvedFieldReference("name3")
-    ))
+      ),
+      ignoreTimeAttributes = true)
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -121,7 +123,8 @@ class TableEnvironmentTest extends TableTestBase {
         UnresolvedFieldReference("name1"),
         UnresolvedFieldReference("name2"),
         UnresolvedFieldReference("name3")
-      ))
+      ),
+      ignoreTimeAttributes = true)
   }
 
   @Test
@@ -132,7 +135,8 @@ class TableEnvironmentTest extends TableTestBase {
         UnresolvedFieldReference("pf3"),
         UnresolvedFieldReference("pf1"),
         UnresolvedFieldReference("pf2")
-      ))
+      ),
+      ignoreTimeAttributes = true)
 
     fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -142,7 +146,8 @@ class TableEnvironmentTest extends TableTestBase {
   def testGetFieldInfoAtomicName1(): Unit = {
     val fieldInfo = tEnv.getFieldInfo(
       atomicType,
-      Array(UnresolvedFieldReference("name"))
+      Array(UnresolvedFieldReference("name")),
+      ignoreTimeAttributes = true
     )
 
     fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1))
@@ -156,7 +161,8 @@ class TableEnvironmentTest extends TableTestBase {
       Array(
         UnresolvedFieldReference("name1"),
         UnresolvedFieldReference("name2")
-      ))
+      ),
+      ignoreTimeAttributes = true)
   }
 
   @Test
@@ -167,7 +173,8 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("f0"), "name1"),
         Alias(UnresolvedFieldReference("f1"), "name2"),
         Alias(UnresolvedFieldReference("f2"), "name3")
-      ))
+      ),
+      ignoreTimeAttributes = true)
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -181,7 +188,8 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("f2"), "name1"),
         Alias(UnresolvedFieldReference("f0"), "name2"),
         Alias(UnresolvedFieldReference("f1"), "name3")
-      ))
+      ),
+      ignoreTimeAttributes = true)
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -195,7 +203,8 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("xxx"), "name1"),
         Alias(UnresolvedFieldReference("yyy"), "name2"),
         Alias(UnresolvedFieldReference("zzz"), "name3")
-      ))
+      ),
+      ignoreTimeAttributes = true)
   }
 
   @Test
@@ -206,7 +215,8 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("cf1"), "name1"),
         Alias(UnresolvedFieldReference("cf2"), "name2"),
         Alias(UnresolvedFieldReference("cf3"), "name3")
-      ))
+      ),
+      ignoreTimeAttributes = true)
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -220,7 +230,8 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("cf3"), "name1"),
         Alias(UnresolvedFieldReference("cf1"), "name2"),
         Alias(UnresolvedFieldReference("cf2"), "name3")
-      ))
+      ),
+      ignoreTimeAttributes = true)
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -234,7 +245,8 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("xxx"), "name1"),
         Alias(UnresolvedFieldReference("yyy"), "name2"),
         Alias(UnresolvedFieldReference("zzz"), "name3")
-      ))
+      ),
+      ignoreTimeAttributes = true)
   }
 
   @Test
@@ -245,7 +257,8 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("pf1"), "name1"),
         Alias(UnresolvedFieldReference("pf2"), "name2"),
         Alias(UnresolvedFieldReference("pf3"), "name3")
-      ))
+      ),
+      ignoreTimeAttributes = true)
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -259,7 +272,8 @@ class TableEnvironmentTest extends TableTestBase {
         Alias(UnresolvedFieldReference("pf3"), "name1"),
         Alias(UnresolvedFieldReference("pf1"), "name2"),
         Alias(UnresolvedFieldReference("pf2"), "name3")
-      ))
+      ),
+      ignoreTimeAttributes = true)
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -272,8 +286,9 @@ class TableEnvironmentTest extends TableTestBase {
       Array(
         Alias(UnresolvedFieldReference("xxx"), "name1"),
         Alias(UnresolvedFieldReference("yyy"), "name2"),
-        Alias( UnresolvedFieldReference("zzz"), "name3")
-      ))
+        Alias(UnresolvedFieldReference("zzz"), "name3")
+      ),
+      ignoreTimeAttributes = true)
   }
 
   @Test(expected = classOf[TableException])
@@ -282,12 +297,16 @@ class TableEnvironmentTest extends TableTestBase {
       atomicType,
       Array(
         Alias(UnresolvedFieldReference("name1"), "name2")
-      ))
+      ),
+      ignoreTimeAttributes = true)
   }
 
   @Test(expected = classOf[TableException])
   def testGetFieldInfoGenericRowAlias(): Unit = {
-    tEnv.getFieldInfo(genericRowType, Array(UnresolvedFieldReference("first")))
+    tEnv.getFieldInfo(
+      genericRowType,
+      Array(UnresolvedFieldReference("first")),
+      ignoreTimeAttributes = true)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index e61e190..57ee3b3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -208,16 +208,6 @@ class TableEnvironmentITCase(
   }
 
   @Test(expected = classOf[TableException])
-  def testToTableWithToFewFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Number of fields does not match.
-      .toTable(tEnv, 'a, 'b)
-  }
-
-  @Test(expected = classOf[TableException])
   def testToTableWithToManyFields(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
index 0ccb557..71d0002 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
@@ -47,7 +47,7 @@ class WindowAggregateTest extends TableTestBase {
           batchTableNode(0),
           term("select", "ts, a, b")
         ),
-        term("window", EventTimeTumblingGroupWindow('w$, 'ts, 7200000.millis)),
+        term("window", TumblingGroupWindow('w$, 'ts, 7200000.millis)),
         term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
       )
 
@@ -76,7 +76,7 @@ class WindowAggregateTest extends TableTestBase {
           "DataSetWindowAggregate",
           batchTableNode(0),
           term("groupBy", "c"),
-          term("window", EventTimeTumblingGroupWindow('w$, 'ts, 240000.millis)),
+          term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)),
           term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " +
             "start('w$) AS w$start, end('w$) AS w$end")
         ),
@@ -106,7 +106,7 @@ class WindowAggregateTest extends TableTestBase {
           batchTableNode(0),
           term("select", "ts, b, a")
         ),
-        term("window", EventTimeTumblingGroupWindow('w$, 'ts, 240000.millis)),
+        term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)),
         term("select", "weightedAvg(b, a) AS wAvg")
       )
 
@@ -132,7 +132,7 @@ class WindowAggregateTest extends TableTestBase {
           term("select", "ts, a, b")
         ),
         term("window",
-          EventTimeSlidingGroupWindow('w$, 'ts, 5400000.millis, 900000.millis)),
+          SlidingGroupWindow('w$, 'ts, 5400000.millis, 900000.millis)),
         term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
       )
 
@@ -162,7 +162,7 @@ class WindowAggregateTest extends TableTestBase {
           batchTableNode(0),
           term("groupBy", "c, d"),
           term("window",
-            EventTimeSlidingGroupWindow('w$, 'ts, 10800000.millis, 3600000.millis)),
+            SlidingGroupWindow('w$, 'ts, 10800000.millis, 3600000.millis)),
           term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " +
             "start('w$) AS w$start, end('w$) AS w$end")
         ),
@@ -188,7 +188,7 @@ class WindowAggregateTest extends TableTestBase {
           batchTableNode(0),
           term("select", "ts")
         ),
-        term("window", EventTimeSessionGroupWindow('w$, 'ts, 1800000.millis)),
+        term("window", SessionGroupWindow('w$, 'ts, 1800000.millis)),
         term("select", "COUNT(*) AS cnt")
       )
 
@@ -217,7 +217,7 @@ class WindowAggregateTest extends TableTestBase {
           "DataSetWindowAggregate",
           batchTableNode(0),
           term("groupBy", "c, d"),
-          term("window", EventTimeSessionGroupWindow('w$, 'ts, 43200000.millis)),
+          term("window", SessionGroupWindow('w$, 'ts, 43200000.millis)),
           term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " +
             "start('w$) AS w$start, end('w$) AS w$end")
         ),
@@ -249,7 +249,7 @@ class WindowAggregateTest extends TableTestBase {
             term("select", "ts, c")
           ),
           term("groupBy", "c"),
-          term("window", EventTimeTumblingGroupWindow('w$, 'ts, 240000.millis)),
+          term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)),
           term("select", "c, start('w$) AS w$start, end('w$) AS w$end")
         ),
         term("select", "CAST(w$end) AS w$end")
@@ -304,7 +304,7 @@ class WindowAggregateTest extends TableTestBase {
 
     val sql = "SELECT COUNT(*) " +
       "FROM T " +
-      "GROUP BY TUMBLE(proctime(), b * INTERVAL '1' MINUTE)"
+      "GROUP BY TUMBLE(ts, b * INTERVAL '1' MINUTE)"
     util.verifySql(sql, "n/a")
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
index 6ebfec0..b484293 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
@@ -19,14 +19,12 @@ package org.apache.flink.table.api.scala.batch.table
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.expressions.{RowtimeAttribute, Upper, WindowReference}
-import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.api.scala.batch.table.FieldProjectionTest._
-import org.apache.flink.table.plan.logical.EventTimeTumblingGroupWindow
-import org.apache.flink.table.utils._
-import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.expressions.{Upper, WindowReference}
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.plan.logical.TumblingGroupWindow
 import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{TableTestBase, _}
 import org.junit.Test
 
 /**
@@ -223,7 +221,8 @@ class FieldProjectionTest extends TableTestBase {
 
   @Test
   def testSelectFromStreamingWindow(): Unit = {
-    val sourceTable = streamUtil.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+    val sourceTable = streamUtil
+      .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
     val resultTable = sourceTable
         .window(Tumble over 5.millis on 'rowtime as 'w)
         .groupBy('w)
@@ -235,14 +234,14 @@ class FieldProjectionTest extends TableTestBase {
         unaryNode(
           "DataStreamCalc",
           streamTableNode(0),
-          term("select", "c", "a", "UPPER(c) AS $f2")
+          term("select", "c", "a", "rowtime", "UPPER(c) AS $f3")
         ),
         term("window",
-          EventTimeTumblingGroupWindow(
-           WindowReference("w"),
-            RowtimeAttribute(),
+          TumblingGroupWindow(
+            WindowReference("w"),
+            'rowtime,
             5.millis)),
-        term("select", "COUNT($f2) AS TMP_0", "SUM(a) AS TMP_1")
+        term("select", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1")
       )
 
     streamUtil.verifyTable(resultTable, expected)
@@ -250,7 +249,8 @@ class FieldProjectionTest extends TableTestBase {
 
   @Test
   def testSelectFromStreamingGroupedWindow(): Unit = {
-    val sourceTable = streamUtil.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+    val sourceTable = streamUtil
+      .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
     val resultTable = sourceTable
         .window(Tumble over 5.millis on 'rowtime as 'w)
         .groupBy('w, 'b)
@@ -263,15 +263,15 @@ class FieldProjectionTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "c", "a", "b", "UPPER(c) AS $f3")
+            term("select", "c", "a", "b", "rowtime", "UPPER(c) AS $f4")
           ),
           term("groupBy", "b"),
           term("window",
-            EventTimeTumblingGroupWindow(
-             WindowReference("w"),
-              RowtimeAttribute(),
+            TumblingGroupWindow(
+              WindowReference("w"),
+              'rowtime,
               5.millis)),
-          term("select", "b", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1")
+          term("select", "b", "COUNT($f4) AS TMP_0", "SUM(a) AS TMP_1")
         ),
         term("select", "TMP_0", "TMP_1", "b")
     )

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
index 8a20f6d..c481105 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
@@ -63,25 +63,24 @@ class GroupWindowTest extends TableTestBase {
   //===============================================================================================
 
   @Test(expected = classOf[ValidationException])
-  def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
+  def testInvalidProcessingTimeDefinition(): Unit = {
     val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .window(Tumble over 50.milli as 'w)   // require a time attribute
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
+    // proctime is not allowed
+    util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string)
   }
 
   @Test(expected = classOf[ValidationException])
-  def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+  def testInvalidProcessingTimeDefinition2(): Unit = {
     val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    // proctime is not allowed
+    util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+  }
 
-    table
-      .window(Tumble over 2.rows as 'w)   // require a time attribute
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
+  @Test(expected = classOf[ValidationException])
+  def testInvalidEventTimeDefinition(): Unit = {
+    val util = batchTestUtil()
+    // definition must not extend schema
+    util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
   }
 
   @Test(expected = classOf[ValidationException])
@@ -101,7 +100,7 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testEventTimeTumblingGroupWindowOverCount(): Unit = {
     val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
 
     val windowedTable = table
       .window(Tumble over 2.rows on 'long as 'w)
@@ -112,7 +111,7 @@ class GroupWindowTest extends TableTestBase {
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -135,7 +134,7 @@ class GroupWindowTest extends TableTestBase {
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
       term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
     )
 
@@ -145,7 +144,7 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testEventTimeTumblingGroupWindowOverTime(): Unit = {
     val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
 
     val windowedTable = table
       .window(Tumble over 5.milli on 'long as 'w)
@@ -156,7 +155,7 @@ class GroupWindowTest extends TableTestBase {
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -164,28 +163,6 @@ class GroupWindowTest extends TableTestBase {
   }
 
   @Test(expected = classOf[ValidationException])
-  def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .window(Tumble over 50.milli as 'w) // require a time attribute
-      .groupBy('w)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .window(Tumble over 2.rows as 'w) // require a time attribute
-      .groupBy('w)
-      .select('int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
   def testAllTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
     val util = batchTestUtil()
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
@@ -216,7 +193,7 @@ class GroupWindowTest extends TableTestBase {
         batchTableNode(0),
         term("select", "int", "long")
       ),
-      term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -240,7 +217,7 @@ class GroupWindowTest extends TableTestBase {
         batchTableNode(0),
         term("select", "int", "long")
       ),
-      term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -252,28 +229,6 @@ class GroupWindowTest extends TableTestBase {
   //===============================================================================================
 
   @Test(expected = classOf[ValidationException])
-  def testProcessingTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .window(Slide over 50.milli every 50.milli as 'w) // require on a time attribute
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .window(Slide over 10.rows every 5.rows as 'w) // require on a time attribute
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
   def testSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
     val util = batchTestUtil()
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
@@ -302,7 +257,7 @@ class GroupWindowTest extends TableTestBase {
       batchTableNode(0),
       term("groupBy", "string"),
       term("window",
-        EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+        SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -324,7 +279,7 @@ class GroupWindowTest extends TableTestBase {
       batchTableNode(0),
       term("groupBy", "string"),
       term("window",
-        EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
+        SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -348,7 +303,7 @@ class GroupWindowTest extends TableTestBase {
       batchTableNode(0),
       term("groupBy", "string"),
       term("window",
-           EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+           SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
       term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
     )
 
@@ -356,17 +311,6 @@ class GroupWindowTest extends TableTestBase {
   }
 
   @Test(expected = classOf[ValidationException])
-  def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .window(Slide over 2.rows every 1.rows as 'w) // require on a time attribute
-      .groupBy('w)
-      .select('int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
   def testAllSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
     val util = batchTestUtil()
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
@@ -374,7 +318,7 @@ class GroupWindowTest extends TableTestBase {
     val myWeightedAvg = new WeightedAvgWithMerge
 
     table
-      .window(Slide over 2.minutes every 1.minute on 'rowtime as 'w)
+      .window(Slide over 2.minutes every 1.minute on 'long as 'w)
       .groupBy('w)
       // invalid function arguments
       .select(myWeightedAvg('int, 'string))
@@ -398,7 +342,7 @@ class GroupWindowTest extends TableTestBase {
         term("select", "int", "long")
       ),
       term("window",
-        EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+        SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -423,7 +367,7 @@ class GroupWindowTest extends TableTestBase {
         term("select", "int", "long")
       ),
       term("window",
-        EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
+        SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -448,7 +392,7 @@ class GroupWindowTest extends TableTestBase {
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeSessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
+      term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -471,7 +415,7 @@ class GroupWindowTest extends TableTestBase {
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeSessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
+      term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
       term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
     )
 
@@ -479,17 +423,6 @@ class GroupWindowTest extends TableTestBase {
   }
 
   @Test(expected = classOf[ValidationException])
-  def testProcessingTimeSessionGroupWindowOverTime(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Session withGap 7.milli as 'w) // require on a time attribute
-      .groupBy('string, 'w)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
   def testSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
     val util = batchTestUtil()
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 67d13b0..6bab4b3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -197,14 +197,14 @@ class SqlITCase extends StreamingWithStateTestBase {
     // for sum aggregation ensure that every time the order of each element is consistent
     env.setParallelism(1)
 
-    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
 
     tEnv.registerTable("T1", t1)
 
     val sqlQuery = "SELECT " +
       "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY ProcTime()  RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime  RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -224,13 +224,13 @@ class SqlITCase extends StreamingWithStateTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
 
-    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
 
     tEnv.registerTable("T1", t1)
 
     val sqlQuery = "SELECT " +
       "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
       "CURRENT ROW)" +
       "from T1"
 
@@ -254,14 +254,14 @@ class SqlITCase extends StreamingWithStateTestBase {
     // for sum aggregation ensure that every time the order of each element is consistent
     env.setParallelism(1)
 
-    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
 
     tEnv.registerTable("T1", t1)
 
     val sqlQuery = "SELECT " +
       "c, " +
-      "count(a) OVER (ORDER BY ProcTime()  RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+      "count(a) OVER (ORDER BY proctime  RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -281,12 +281,12 @@ class SqlITCase extends StreamingWithStateTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
 
-    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
 
     tEnv.registerTable("T1", t1)
 
     val sqlQuery = "SELECT " +
-      "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -328,14 +328,14 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val t1 = env
       .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv).as('a, 'b, 'c)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
 
     val sqlQuery = "SELECT " +
       "c, a, " +
-      "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
-      ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
+      ", sum(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
       " from T1"
 
     val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -385,14 +385,14 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val t1 = env
       .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv).as('a, 'b, 'c)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
 
     val sqlQuery = "SELECT " +
       "c, a, " +
-      "count(a) OVER (ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)," +
-      "sum(a) OVER (ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
+      "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)," +
+      "sum(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -453,15 +453,15 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val t1 = env
       .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv).as('a, 'b, 'c)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
 
     val sqlQuery = "SELECT " +
       "c, b, " +
-      "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
       "preceding AND CURRENT ROW)" +
-      ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " +
+      ", sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
       " preceding AND CURRENT ROW)" +
       " from T1"
 
@@ -525,15 +525,15 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val t1 = env
       .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv).as('a, 'b, 'c)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
 
     val sqlQuery = "SELECT " +
       "c, b, " +
-      "count(a) OVER (ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " +
+      "count(a) OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
       "preceding AND CURRENT ROW)" +
-      ", sum(a) OVER (ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " +
+      ", sum(a) OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
       " preceding AND CURRENT ROW)" +
       " from T1"
 
@@ -565,14 +565,14 @@ class SqlITCase extends StreamingWithStateTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
 
-    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
 
     tEnv.registerTable("T1", t1)
 
     val sqlQuery = "SELECT " +
       "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY b ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -592,15 +592,15 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, b, c, " +
       "SUM(b) over (" +
-      "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
       "count(b) over (" +
-      "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
       "avg(b) over (" +
-      "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
       "max(b) over (" +
-      "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
       "min(b) over (" +
-      "partition by a order by rowtime() rows between unbounded preceding and current row) " +
+      "partition by a order by rowtime rows between unbounded preceding and current row) " +
       "from T1"
 
     val data = Seq(
@@ -632,7 +632,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     )
 
     val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv).as('a, 'b, 'c)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
 
@@ -670,15 +670,15 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, b, c, " +
       "SUM(b) over (" +
-      "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
       "count(b) over (" +
-      "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
       "avg(b) over (" +
-      "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
       "max(b) over (" +
-      "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
       "min(b) over (" +
-      "partition by a order by rowtime() rows between unbounded preceding and current row) " +
+      "partition by a order by rowtime rows between unbounded preceding and current row) " +
       "from T1"
 
     val data = Seq(
@@ -702,7 +702,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     )
 
     val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv).as('a, 'b, 'c)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
 
@@ -740,11 +740,11 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.setParallelism(1)
 
     val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (order by rowtime() rows between unbounded preceding and current row), " +
-      "count(b) over (order by rowtime() rows between unbounded preceding and current row), " +
-      "avg(b) over (order by rowtime() rows between unbounded preceding and current row), " +
-      "max(b) over (order by rowtime() rows between unbounded preceding and current row), " +
-      "min(b) over (order by rowtime() rows between unbounded preceding and current row) " +
+      "SUM(b) over (order by rowtime rows between unbounded preceding and current row), " +
+      "count(b) over (order by rowtime rows between unbounded preceding and current row), " +
+      "avg(b) over (order by rowtime rows between unbounded preceding and current row), " +
+      "max(b) over (order by rowtime rows between unbounded preceding and current row), " +
+      "min(b) over (order by rowtime rows between unbounded preceding and current row) " +
       "from T1"
 
     val data = Seq(
@@ -764,7 +764,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     )
 
     val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv).as('a, 'b, 'c)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
 
@@ -795,11 +795,11 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.setParallelism(1)
 
     val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (order by rowtime() rows between unbounded preceding and current row), " +
-      "count(b) over (order by rowtime() rows between unbounded preceding and current row), " +
-      "avg(b) over (order by rowtime() rows between unbounded preceding and current row), " +
-      "max(b) over (order by rowtime() rows between unbounded preceding and current row), " +
-      "min(b) over (order by rowtime() rows between unbounded preceding and current row) " +
+      "SUM(b) over (order by rowtime rows between unbounded preceding and current row), " +
+      "count(b) over (order by rowtime rows between unbounded preceding and current row), " +
+      "avg(b) over (order by rowtime rows between unbounded preceding and current row), " +
+      "max(b) over (order by rowtime rows between unbounded preceding and current row), " +
+      "min(b) over (order by rowtime rows between unbounded preceding and current row) " +
       "from T1"
 
     val data = Seq(
@@ -820,7 +820,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     )
 
     val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv).as('a, 'b, 'c)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
 
@@ -852,11 +852,11 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.setParallelism(1)
 
     val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (order by rowtime() range between unbounded preceding and current row), " +
-      "count(b) over (order by rowtime() range between unbounded preceding and current row), " +
-      "avg(b) over (order by rowtime() range between unbounded preceding and current row), " +
-      "max(b) over (order by rowtime() range between unbounded preceding and current row), " +
-      "min(b) over (order by rowtime() range between unbounded preceding and current row) " +
+      "SUM(b) over (order by rowtime range between unbounded preceding and current row), " +
+      "count(b) over (order by rowtime range between unbounded preceding and current row), " +
+      "avg(b) over (order by rowtime range between unbounded preceding and current row), " +
+      "max(b) over (order by rowtime range between unbounded preceding and current row), " +
+      "min(b) over (order by rowtime range between unbounded preceding and current row) " +
       "from T1"
 
     val data = Seq(
@@ -878,7 +878,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     )
 
     val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv).as('a, 'b, 'c)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
 
@@ -916,15 +916,15 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, b, c, " +
       "SUM(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "partition by a order by rowtime range between unbounded preceding and current row), " +
       "count(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "partition by a order by rowtime range between unbounded preceding and current row), " +
       "avg(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "partition by a order by rowtime range between unbounded preceding and current row), " +
       "max(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "partition by a order by rowtime range between unbounded preceding and current row), " +
       "min(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and current row) " +
+      "partition by a order by rowtime range between unbounded preceding and current row) " +
       "from T1"
 
     val data = Seq(
@@ -946,7 +946,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     )
 
     val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv).as('a, 'b, 'c)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
 
@@ -981,14 +981,15 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.setParallelism(1)
     StreamITCase.testResults = mutable.MutableList()
 
-    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    val t = StreamTestData.get5TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
     tEnv.registerTable("MyTable", t)
 
     val sqlQuery = "SELECT a,  " +
       " SUM(c) OVER (" +
-      " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
+      " PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
       " MIN(c) OVER (" +
-      " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
+      " PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
       " FROM MyTable"
 
     val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -1023,14 +1024,15 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.setParallelism(1)
     StreamITCase.testResults = mutable.MutableList()
 
-    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    val t = StreamTestData.get5TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
     tEnv.registerTable("MyTable", t)
 
     val sqlQuery = "SELECT a,  " +
       " SUM(c) OVER (" +
-      " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC , " +
+      " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC , " +
       " MIN(c) OVER (" +
-      " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
+      " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
       " FROM MyTable"
 
     val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -1066,14 +1068,15 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.setParallelism(1)
     StreamITCase.testResults = mutable.MutableList()
 
-    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    val t = StreamTestData.get5TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
     tEnv.registerTable("MyTable", t)
 
     val sqlQuery = "SELECT a,  " +
       " SUM(c) OVER (" +
-      " ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
+      " ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
       " MIN(c) OVER (" +
-      " ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
+      " ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
       " FROM MyTable"
 
     val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -1108,14 +1111,15 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.setParallelism(1)
     StreamITCase.testResults = mutable.MutableList()
 
-    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    val t = StreamTestData.get5TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
     tEnv.registerTable("MyTable", t)
 
     val sqlQuery = "SELECT a,  " +
       " SUM(c) OVER (" +
-      " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
+      " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
       " MIN(c) OVER (" +
-      " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
+      " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
       " FROM MyTable"
     val result = tEnv.sql(sqlQuery).toDataStream[Row]
     result.addSink(new StreamITCase.StringSink)

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index cef2665..edf7b1d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -21,22 +21,23 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeSessionGroupWindow, ProcessingTimeSlidingGroupWindow}
+import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.Test
+import org.junit.{Ignore, Test}
 
 class WindowAggregateTest extends TableTestBase {
   private val streamUtil: StreamTableTestUtil = streamTestUtil()
-  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
+  streamUtil.addTable[(Int, String, Long)](
+    "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
 
   @Test
   def testNonPartitionedProcessingTimeBoundedWindow() = {
 
-    val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" +
-      "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA FROM MyTable"
-
-    val expected =
+    val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY proctime  " +
+      "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " +
+      "FROM MyTable"
+      val expected =
       unaryNode(
         "DataStreamCalc",
         unaryNode(
@@ -44,11 +45,11 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
+            term("select", "a", "c", "proctime")
           ),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0")
+          term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0")
         ),
         term("select", "a", "w0$o0 AS $1")
       )
@@ -59,7 +60,7 @@ class WindowAggregateTest extends TableTestBase {
   @Test
   def testPartitionedProcessingTimeBoundedWindow() = {
 
-    val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" +
+    val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY proctime " +
       "RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
       "FROM MyTable"
       val expected =
@@ -70,12 +71,12 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
+            term("select", "a", "c", "proctime")
           ),
           term("partitionBy","a"),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1")
+          term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1")
         ),
         term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA")
       )
@@ -84,23 +85,24 @@ class WindowAggregateTest extends TableTestBase {
   }
 
   @Test
+  @Ignore // TODO enable once CALCITE-1761 is fixed
   def testTumbleFunction() = {
     streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
 
     val sql =
       "SELECT " +
         "  COUNT(*), weightedAvg(c, a) AS wAvg, " +
-        "  TUMBLE_START(rowtime(), INTERVAL '15' MINUTE), " +
-        "  TUMBLE_END(rowtime(), INTERVAL '15' MINUTE)" +
+        "  TUMBLE_START(rowtime, INTERVAL '15' MINUTE), " +
+        "  TUMBLE_END(rowtime, INTERVAL '15' MINUTE)" +
         "FROM MyTable " +
-        "GROUP BY TUMBLE(rowtime(), INTERVAL '15' MINUTE)"
+        "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
     val expected =
       unaryNode(
         "DataStreamCalc",
         unaryNode(
           "DataStreamAggregate",
           streamTableNode(0),
-          term("window", EventTimeTumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+          term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
           term("select",
             "COUNT(*) AS EXPR$0, " +
               "weightedAvg(c, a) AS wAvg, " +
@@ -113,23 +115,23 @@ class WindowAggregateTest extends TableTestBase {
   }
 
   @Test
+  @Ignore // TODO enable once CALCITE-1761 is fixed
   def testHoppingFunction() = {
     streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
 
     val sql =
       "SELECT COUNT(*), weightedAvg(c, a) AS wAvg, " +
-        "  HOP_START(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR), " +
-        "  HOP_END(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR) " +
+        "  HOP_START(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR), " +
+        "  HOP_END(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR) " +
         "FROM MyTable " +
-        "GROUP BY HOP(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
+        "GROUP BY HOP(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
     val expected =
       unaryNode(
         "DataStreamCalc",
         unaryNode(
           "DataStreamAggregate",
           streamTableNode(0),
-          term("window", ProcessingTimeSlidingGroupWindow('w$,
-            3600000.millis, 900000.millis)),
+          term("window", SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 900000.millis)),
           term("select",
             "COUNT(*) AS EXPR$0, " +
               "weightedAvg(c, a) AS wAvg, " +
@@ -142,23 +144,24 @@ class WindowAggregateTest extends TableTestBase {
   }
 
   @Test
+  @Ignore // TODO enable once CALCITE-1761 is fixed
   def testSessionFunction() = {
     streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
 
     val sql =
       "SELECT " +
         "  COUNT(*), weightedAvg(c, a) AS wAvg, " +
-        "  SESSION_START(proctime(), INTERVAL '15' MINUTE), " +
-        "  SESSION_END(proctime(), INTERVAL '15' MINUTE) " +
+        "  SESSION_START(proctime, INTERVAL '15' MINUTE), " +
+        "  SESSION_END(proctime, INTERVAL '15' MINUTE) " +
         "FROM MyTable " +
-        "GROUP BY SESSION(proctime(), INTERVAL '15' MINUTE)"
+        "GROUP BY SESSION(proctime, INTERVAL '15' MINUTE)"
     val expected =
       unaryNode(
         "DataStreamCalc",
         unaryNode(
           "DataStreamAggregate",
           streamTableNode(0),
-          term("window", ProcessingTimeSessionGroupWindow('w$, 900000.millis)),
+          term("window", SessionGroupWindow('w$, 'rowtime, 900000.millis)),
           term("select",
             "COUNT(*) AS EXPR$0, " +
               "weightedAvg(c, a) AS wAvg, " +
@@ -175,7 +178,7 @@ class WindowAggregateTest extends TableTestBase {
     val sqlQuery =
       "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
         "FROM MyTable " +
-        "GROUP BY TUMBLE(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')"
+        "GROUP BY TUMBLE(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
 
     streamUtil.verifySql(sqlQuery, "n/a")
   }
@@ -185,7 +188,7 @@ class WindowAggregateTest extends TableTestBase {
     val sqlQuery =
       "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
         "FROM MyTable " +
-        "GROUP BY HOP(proctime(), INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')"
+        "GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')"
 
     streamUtil.verifySql(sqlQuery, "n/a")
   }
@@ -195,21 +198,21 @@ class WindowAggregateTest extends TableTestBase {
     val sqlQuery =
       "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
         "FROM MyTable " +
-        "GROUP BY SESSION(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')"
+        "GROUP BY SESSION(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
 
     streamUtil.verifySql(sqlQuery, "n/a")
   }
 
   @Test(expected = classOf[TableException])
   def testVariableWindowSize() = {
-    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime(), c * INTERVAL '1' MINUTE)"
+    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, c * INTERVAL '1' MINUTE)"
     streamUtil.verifySql(sql, "n/a")
   }
 
   @Test(expected = classOf[TableException])
   def testMultiWindow() = {
     val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
-      "FLOOR(rowtime() TO HOUR), FLOOR(rowtime() TO MINUTE)"
+      "FLOOR(rowtime TO HOUR), FLOOR(rowtime TO MINUTE)"
     val expected = ""
     streamUtil.verifySql(sql, expected)
   }
@@ -237,8 +240,8 @@ class WindowAggregateTest extends TableTestBase {
   def testUnboundPartitionedProcessingWindowWithRange() = {
     val sql = "SELECT " +
       "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
       "from MyTable"
 
     val expected =
@@ -249,12 +252,12 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
+            term("select", "a", "c", "proctime")
           ),
           term("partitionBy", "c"),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
         ),
         term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
       )
@@ -265,7 +268,7 @@ class WindowAggregateTest extends TableTestBase {
   def testUnboundPartitionedProcessingWindowWithRow() = {
     val sql = "SELECT " +
       "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
       "CURRENT ROW) as cnt1 " +
       "from MyTable"
 
@@ -274,15 +277,11 @@ class WindowAggregateTest extends TableTestBase {
         "DataStreamCalc",
         unaryNode(
           "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
-          ),
+          streamTableNode(0),
           term("partitionBy", "c"),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+          term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
         ),
         term("select", "c", "w0$o0 AS $1")
       )
@@ -293,8 +292,8 @@ class WindowAggregateTest extends TableTestBase {
   def testUnboundNonPartitionedProcessingWindowWithRange() = {
     val sql = "SELECT " +
       "c, " +
-      "count(a) OVER (ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+      "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
       "from MyTable"
 
     val expected =
@@ -305,11 +304,11 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
+            term("select", "a", "c", "proctime")
           ),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
         ),
         term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
       )
@@ -320,7 +319,7 @@ class WindowAggregateTest extends TableTestBase {
   def testUnboundNonPartitionedProcessingWindowWithRow() = {
     val sql = "SELECT " +
       "c, " +
-      "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND " +
+      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
       "CURRENT ROW) as cnt1 " +
       "from MyTable"
 
@@ -329,14 +328,10 @@ class WindowAggregateTest extends TableTestBase {
         "DataStreamCalc",
         unaryNode(
           "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
-          ),
-          term("orderBy", "PROCTIME"),
+          streamTableNode(0),
+          term("orderBy", "proctime"),
           term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+          term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
         ),
         term("select", "c", "w0$o0 AS $1")
       )
@@ -347,8 +342,8 @@ class WindowAggregateTest extends TableTestBase {
   def testUnboundNonPartitionedEventTimeWindowWithRange() = {
     val sql = "SELECT " +
       "c, " +
-      "count(a) OVER (ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt2 " +
+      "count(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
       "from MyTable"
 
     val expected =
@@ -359,11 +354,11 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "ROWTIME() AS $2")
+            term("select", "a", "c", "rowtime")
           ),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
         ),
         term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
       )
@@ -374,8 +369,8 @@ class WindowAggregateTest extends TableTestBase {
   def testUnboundPartitionedEventTimeWindowWithRange() = {
     val sql = "SELECT " +
       "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt2 " +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
       "from MyTable"
 
     val expected =
@@ -386,12 +381,12 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "ROWTIME() AS $2")
+            term("select", "a", "c", "rowtime")
           ),
           term("partitionBy", "c"),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
         ),
         term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
       )
@@ -402,7 +397,7 @@ class WindowAggregateTest extends TableTestBase {
   def testBoundPartitionedRowTimeWindowWithRow() = {
     val sql = "SELECT " +
       "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
       "CURRENT ROW) as cnt1 " +
       "from MyTable"
 
@@ -414,12 +409,12 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "ROWTIME() AS $2")
+            term("select", "a", "c", "rowtime")
           ),
           term("partitionBy", "c"),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
         ),
         term("select", "c", "w0$o0 AS $1")
       )
@@ -430,7 +425,7 @@ class WindowAggregateTest extends TableTestBase {
   def testBoundNonPartitionedRowTimeWindowWithRow() = {
     val sql = "SELECT " +
         "c, " +
-        "count(a) OVER (ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
+        "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
         "CURRENT ROW) as cnt1 " +
         "from MyTable"
 
@@ -442,11 +437,11 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "ROWTIME() AS $2")
+            term("select", "a", "c", "rowtime")
           ),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
         ),
         term("select", "c", "w0$o0 AS $1")
       )
@@ -457,7 +452,7 @@ class WindowAggregateTest extends TableTestBase {
   def testBoundPartitionedRowTimeWindowWithRange() = {
     val sql = "SELECT " +
       "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY RowTime() " +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime " +
       "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) as cnt1 " +
       "from MyTable"
 
@@ -469,12 +464,12 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "ROWTIME() AS $2")
+            term("select", "a", "c", "rowtime")
           ),
           term("partitionBy", "c"),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
         ),
         term("select", "c", "w0$o0 AS $1")
       )
@@ -485,7 +480,7 @@ class WindowAggregateTest extends TableTestBase {
   def testBoundNonPartitionedRowTimeWindowWithRange() = {
     val sql = "SELECT " +
       "c, " +
-      "count(a) OVER (ORDER BY RowTime() " +
+      "count(a) OVER (ORDER BY rowtime " +
       "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) as cnt1 " +
       "from MyTable"
 
@@ -497,11 +492,11 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "ROWTIME() AS $2")
+            term("select", "a", "c", "rowtime")
           ),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
         ),
         term("select", "c", "w0$o0 AS $1")
       )
@@ -512,7 +507,7 @@ class WindowAggregateTest extends TableTestBase {
   def testBoundNonPartitionedProcTimeWindowWithRowRange() = {
     val sql = "SELECT " +
       "c, " +
-      "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " +
+      "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
       "CURRENT ROW) as cnt1 " +
       "from MyTable"
 
@@ -524,11 +519,11 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
+            term("select", "a", "c", "proctime")
           ),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
         ),
         term("select", "c", "w0$o0 AS $1")
       )
@@ -539,7 +534,7 @@ class WindowAggregateTest extends TableTestBase {
   def testBoundPartitionedProcTimeWindowWithRowRange() = {
     val sql = "SELECT " +
       "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 preceding AND " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
       "CURRENT ROW) as cnt1 " +
       "from MyTable"
 
@@ -551,12 +546,12 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
+            term("select", "a", "c", "proctime")
           ),
           term("partitionBy", "c"),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
         ),
         term("select", "c", "w0$o0 AS $1")
       )

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
index 3651749..4a6a616 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
@@ -57,13 +57,13 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
     StreamITCase.testResults = mutable.MutableList()
 
     val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
 
     val countFun = new CountAggFunction
     val weightAvgFun = new WeightedAvg
 
     val windowedTable = table
-      .window(Slide over 2.rows every 1.rows as 'w)
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
       .groupBy('w, 'string)
       .select('string, countFun('int), 'int.avg,
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
@@ -102,7 +102,7 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
     val stream = env
       .fromCollection(sessionWindowTestdata)
       .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(10L))
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
 
     val windowedTable = table
       .window(Session withGap 5.milli on 'rowtime as 'w)
@@ -126,12 +126,12 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
     StreamITCase.testResults = mutable.MutableList()
 
     val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
     val countFun = new CountAggFunction
     val weightAvgFun = new WeightedAvg
 
     val windowedTable = table
-      .window(Tumble over 2.rows as 'w)
+      .window(Tumble over 2.rows on 'proctime as 'w)
       .groupBy('w)
       .select(countFun('string), 'int.avg,
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
@@ -154,7 +154,7 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
     val stream = env
       .fromCollection(data)
       .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(0L))
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
     val countFun = new CountAggFunction
     val weightAvgFun = new WeightedAvg
 

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
index 5969e91..1114cf0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
@@ -111,22 +111,6 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
   }
 
   @Test(expected = classOf[TableException])
-  def testAsWithToFewFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[TableException])
   def testAsWithToManyFields(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment