You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:12 UTC

[03/44] flink git commit: [FLINK-6617][table] Improve JAVA and SCALA logical plans consistent test

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/OverWindowITCase.scala
new file mode 100644
index 0000000..15828eb
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/OverWindowITCase.scala
@@ -0,0 +1,838 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream.sql
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class OverWindowITCase extends StreamingWithStateTestBase {
+
+  val data = List(
+    (1L, 1, "Hello"),
+    (2L, 2, "Hello"),
+    (3L, 3, "Hello"),
+    (4L, 4, "Hello"),
+    (5L, 5, "Hello"),
+    (6L, 6, "Hello"),
+    (7L, 7, "Hello World"),
+    (8L, 8, "Hello World"),
+    (20L, 20, "Hello World"))
+
+ @Test
+  def testProcTimeBoundedPartitionedRowsOver(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(1)
+    StreamITCase.clear
+
+    val t = StreamTestData.get5TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a, " +
+      "  SUM(c) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " +
+      "  MIN(c) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
+      "FROM MyTable"
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "1,0,0",
+      "2,1,1",
+      "2,3,1",
+      "3,3,3",
+      "3,7,3",
+      "3,12,3",
+      "4,6,6",
+      "4,13,6",
+      "4,21,6",
+      "4,30,6",
+      "5,10,10",
+      "5,21,10",
+      "5,33,10",
+      "5,46,10",
+      "5,60,10")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeBoundedNonPartitionedRowsOver(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    StreamITCase.clear
+
+    val t = StreamTestData.get5TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a, " +
+      "  SUM(c) OVER (" +
+      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " +
+      "  MIN(c) OVER (" +
+      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
+      "FROM MyTable"
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "1,0,0",
+      "2,1,0",
+      "2,3,0",
+      "3,6,0",
+      "3,10,0",
+      "3,15,0",
+      "4,21,0",
+      "4,28,0",
+      "4,36,0",
+      "4,45,0",
+      "5,55,0",
+      "5,66,1",
+      "5,77,2",
+      "5,88,3",
+      "5,99,4")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRangeOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    // for sum aggregation ensure that every time the order of each element is consistent
+    env.setParallelism(1)
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
+      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
+      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRowsOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT c, cnt1 from " +
+      "(SELECT " +
+      "c, " +
+      "count(a) " +
+      " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+      "as cnt1 from T1)"
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "Hello World,1", "Hello World,2", "Hello World,3",
+      "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
+    val queryConfig =
+      new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(2), Time.hours(3))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    // for sum aggregation ensure that every time the order of each element is consistent
+    env.setParallelism(1)
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY proctime  RANGE UNBOUNDED preceding), " +
+      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig)
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
+      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRowsOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRangeOver(): Unit = {
+    val data = Seq(
+      Left((1500L, (1L, 15, "Hello"))),
+      Left((1600L, (1L, 16, "Hello"))),
+      Left((1000L, (1L, 1, "Hello"))),
+      Left((2000L, (2L, 2, "Hello"))),
+      Right(1000L),
+      Left((2000L, (2L, 2, "Hello"))),
+      Left((2000L, (2L, 3, "Hello"))),
+      Left((3000L, (3L, 3, "Hello"))),
+      Right(2000L),
+      Left((4000L, (4L, 4, "Hello"))),
+      Right(3000L),
+      Left((5000L, (5L, 5, "Hello"))),
+      Right(5000L),
+      Left((6000L, (6L, 6, "Hello"))),
+      Left((6500L, (6L, 65, "Hello"))),
+      Right(7000L),
+      Left((9000L, (6L, 9, "Hello"))),
+      Left((9500L, (6L, 18, "Hello"))),
+      Left((9000L, (6L, 9, "Hello"))),
+      Right(10000L),
+      Left((10000L, (7L, 7, "Hello World"))),
+      Left((11000L, (7L, 17, "Hello World"))),
+      Left((11000L, (7L, 77, "Hello World"))),
+      Right(12000L),
+      Left((14000L, (7L, 18, "Hello World"))),
+      Right(14000L),
+      Left((15000L, (8L, 8, "Hello World"))),
+      Right(17000L),
+      Left((20000L, (20L, 20, "Hello World"))),
+      Right(19000L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "  c, b, " +
+      "  COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+      "  SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
+      " FROM T1"
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
+      "Hello,3,4,9",
+      "Hello,4,2,7",
+      "Hello,5,2,9",
+      "Hello,6,2,11", "Hello,65,2,12",
+      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
+      "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+      "Hello World,8,2,15",
+      "Hello World,20,1,20")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRowsOver(): Unit = {
+    val data = Seq(
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((3L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Right(2L),
+      Left((3L, (3L, 3, "Hello"))),
+      Left((4L, (4L, 4, "Hello"))),
+      Left((5L, (5L, 5, "Hello"))),
+      Left((6L, (6L, 6, "Hello"))),
+      Left((20L, (20L, 20, "Hello World"))),
+      Right(6L),
+      Left((8L, (8L, 8, "Hello World"))),
+      Left((7L, (7L, 7, "Hello World"))),
+      Right(20L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      " c, a, " +
+      "  COUNT(a) " +
+      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
+      "  SUM(a) " +
+      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+      "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
+      "Hello,6,3,15",
+      "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
+      "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRangeOver(): Unit = {
+    val data = Seq(
+      Left((1500L, (1L, 15, "Hello"))),
+      Left((1600L, (1L, 16, "Hello"))),
+      Left((1000L, (1L, 1, "Hello"))),
+      Left((2000L, (2L, 2, "Hello"))),
+      Right(1000L),
+      Left((2000L, (2L, 2, "Hello"))),
+      Left((2000L, (2L, 3, "Hello"))),
+      Left((3000L, (3L, 3, "Hello"))),
+      Right(2000L),
+      Left((4000L, (4L, 4, "Hello"))),
+      Right(3000L),
+      Left((5000L, (5L, 5, "Hello"))),
+      Right(5000L),
+      Left((6000L, (6L, 6, "Hello"))),
+      Left((6500L, (6L, 65, "Hello"))),
+      Right(7000L),
+      Left((9000L, (6L, 9, "Hello"))),
+      Left((9500L, (6L, 18, "Hello"))),
+      Left((9000L, (6L, 9, "Hello"))),
+      Right(10000L),
+      Left((10000L, (7L, 7, "Hello World"))),
+      Left((11000L, (7L, 17, "Hello World"))),
+      Left((11000L, (7L, 77, "Hello World"))),
+      Right(12000L),
+      Left((14000L, (7L, 18, "Hello World"))),
+      Right(14000L),
+      Left((15000L, (8L, 8, "Hello World"))),
+      Right(17000L),
+      Left((20000L, (20L, 20, "Hello World"))),
+      Right(19000L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "  c, b, " +
+      "  COUNT(a) " +
+      "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+      "  SUM(a) " +
+      "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
+      " FROM T1"
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
+      "Hello,3,4,9",
+      "Hello,4,2,7",
+      "Hello,5,2,9",
+      "Hello,6,2,11", "Hello,65,2,12",
+      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
+      "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+      "Hello World,8,2,15",
+      "Hello World,20,1,20")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRowsOver(): Unit = {
+
+    val data = Seq(
+      Left((2L, (2L, 2, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((20L, (20L, 20, "Hello World"))), // early row
+      Right(3L),
+      Left((2L, (2L, 2, "Hello"))), // late row
+      Left((3L, (3L, 3, "Hello"))),
+      Left((4L, (4L, 4, "Hello"))),
+      Left((5L, (5L, 5, "Hello"))),
+      Left((6L, (6L, 6, "Hello"))),
+      Left((7L, (7L, 7, "Hello World"))),
+      Right(7L),
+      Left((9L, (9L, 9, "Hello World"))),
+      Left((8L, (8L, 8, "Hello World"))),
+      Left((8L, (8L, 8, "Hello World"))),
+      Right(20L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, a, " +
+      "  COUNT(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW), " +
+      "  SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
+      "FROM T1"
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+      "Hello,3,3,7",
+      "Hello,4,3,9", "Hello,5,3,12",
+      "Hello,6,3,15", "Hello World,7,3,18",
+      "Hello World,8,3,21", "Hello World,8,3,23",
+      "Hello World,9,3,25",
+      "Hello World,20,3,37")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "  SUM(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  COUNT(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  AVG(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MAX(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MIN(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L))
+
+    val t1 = env
+      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "1,1,Hello,6,3,2,3,1",
+      "1,2,Hello,6,3,2,3,1",
+      "1,3,Hello world,6,3,2,3,1",
+      "1,1,Hi,7,4,1,3,1",
+      "2,1,Hello,1,1,1,1,1",
+      "2,2,Hello world,6,3,2,3,1",
+      "2,3,Hello world,6,3,2,3,1",
+      "1,4,Hello world,11,5,2,4,1",
+      "1,5,Hello world,29,8,3,7,1",
+      "1,6,Hello world,29,8,3,7,1",
+      "1,7,Hello world,29,8,3,7,1",
+      "2,4,Hello world,15,5,3,5,1",
+      "2,5,Hello world,15,5,3,5,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedPartitionedRowsOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "SUM(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "count(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "avg(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "max(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "min(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row) " +
+      "from T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (3, 1L, "Hello")),
+      Left(14000003L, (1, 2L, "Hello")),
+      Left(14000004L, (1, 3L, "Hello world")),
+      Left(14000007L, (3, 2L, "Hello world")),
+      Left(14000008L, (2, 2L, "Hello world")),
+      Right(14000010L),
+      Left(14000012L, (1, 5L, "Hello world")),
+      Left(14000021L, (1, 6L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000020L),
+      Left(14000024L, (3, 5L, "Hello world")),
+      Left(14000026L, (1, 7L, "Hello world")),
+      Left(14000025L, (1, 8L, "Hello world")),
+      Left(14000022L, (1, 9L, "Hello world")),
+      Right(14000030L))
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+             .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,2,Hello,2,1,2,2,2",
+      "1,3,Hello world,5,2,2,3,2",
+      "1,1,Hi,6,3,2,3,1",
+      "2,1,Hello,1,1,1,1,1",
+      "2,2,Hello world,3,2,1,2,1",
+      "3,1,Hello,1,1,1,1,1",
+      "3,2,Hello world,3,2,1,2,1",
+      "1,5,Hello world,11,4,2,5,1",
+      "1,6,Hello world,17,5,3,6,1",
+      "1,9,Hello world,26,6,4,9,1",
+      "1,8,Hello world,34,7,4,9,1",
+      "1,7,Hello world,41,8,5,9,1",
+      "2,5,Hello world,8,3,2,5,1",
+      "3,5,Hello world,8,3,2,5,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedNonPartitionedRangeOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "  SUM(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  COUNT(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  AVG(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MAX(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MIN(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L))
+
+    val t1 = env
+      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "2,1,Hello,1,1,1,1,1",
+      "1,1,Hello,7,4,1,3,1",
+      "1,2,Hello,7,4,1,3,1",
+      "1,3,Hello world,7,4,1,3,1",
+      "2,2,Hello world,12,6,2,3,1",
+      "2,3,Hello world,12,6,2,3,1",
+      "1,1,Hi,13,7,1,3,1",
+      "1,4,Hello world,17,8,2,4,1",
+      "1,5,Hello world,35,11,3,7,1",
+      "1,6,Hello world,35,11,3,7,1",
+      "1,7,Hello world,35,11,3,7,1",
+      "2,4,Hello world,44,13,3,7,1",
+      "2,5,Hello world,44,13,3,7,1")
+
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedNonPartitionedRowsOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "  SUM(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  COUNT(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  AVG(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MAX(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MIN(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 2L, "Hello")),
+      Left(14000002L, (3, 5L, "Hello")),
+      Left(14000003L, (1, 3L, "Hello")),
+      Left(14000004L, (3, 7L, "Hello world")),
+      Left(14000007L, (4, 9L, "Hello world")),
+      Left(14000008L, (5, 8L, "Hello world")),
+      Right(14000010L),
+      // this element will be discard because it is late
+      Left(14000008L, (6, 8L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (6, 8L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val t1 = env
+      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "2,2,Hello,2,1,2,2,2",
+      "3,5,Hello,7,2,3,5,2",
+      "1,3,Hello,10,3,3,5,2",
+      "3,7,Hello world,17,4,4,7,2",
+      "1,1,Hi,18,5,3,7,1",
+      "4,9,Hello world,27,6,4,9,1",
+      "5,8,Hello world,35,7,5,9,1",
+      "6,8,Hello world,43,8,5,9,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testRowTimeUnBoundedPartitionedRowsOver2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "SUM(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "count(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "avg(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "max(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "min(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row) " +
+      "from T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (3, 1L, "Hello")),
+      Left(14000003L, (1, 2L, "Hello")),
+      Left(14000004L, (1, 3L, "Hello world")),
+      Left(14000007L, (3, 2L, "Hello world")),
+      Left(14000008L, (2, 2L, "Hello world")),
+      Right(14000010L),
+      // the next 3 elements are late
+      Left(14000008L, (1, 4L, "Hello world")),
+      Left(14000008L, (2, 3L, "Hello world")),
+      Left(14000008L, (3, 3L, "Hello world")),
+      Left(14000012L, (1, 5L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 6L, "Hello world")),
+      // the next 3 elements are late
+      Left(14000019L, (1, 6L, "Hello world")),
+      Left(14000018L, (2, 4L, "Hello world")),
+      Left(14000018L, (3, 4L, "Hello world")),
+      Left(14000022L, (2, 5L, "Hello world")),
+      Left(14000022L, (3, 5L, "Hello world")),
+      Left(14000024L, (1, 7L, "Hello world")),
+      Left(14000023L, (1, 8L, "Hello world")),
+      Left(14000021L, (1, 9L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "1,2,Hello,2,1,2,2,2",
+      "1,3,Hello world,5,2,2,3,2",
+      "1,1,Hi,6,3,2,3,1",
+      "2,1,Hello,1,1,1,1,1",
+      "2,2,Hello world,3,2,1,2,1",
+      "3,1,Hello,1,1,1,1,1",
+      "3,2,Hello world,3,2,1,2,1",
+      "1,5,Hello world,11,4,2,5,1",
+      "1,6,Hello world,17,5,3,6,1",
+      "1,9,Hello world,26,6,4,9,1",
+      "1,8,Hello world,34,7,4,9,1",
+      "1,7,Hello world,41,8,5,9,1",
+      "2,5,Hello world,8,3,2,5,1",
+      "3,5,Hello world,8,3,2,5,1"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/SqlITCase.scala
new file mode 100644
index 0000000..ec719d2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/SqlITCase.scala
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream.sql
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+class SqlITCase extends StreamingWithStateTestBase {
+
+   /** test row stream registered table **/
+  @Test
+  def testRowRegister(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT * FROM MyTableRow WHERE c < 3"
+
+    val data = List(
+      Row.of("Hello", "Worlds", Int.box(1)),
+      Row.of("Hello", "Hiden", Int.box(5)),
+      Row.of("Hello again", "Worlds", Int.box(2)))
+        
+    implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO) // tpe is automatically 
+    
+    val ds = env.fromCollection(data)
+    
+    val t = ds.toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTableRow", t)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("Hello,Worlds,1","Hello again,Worlds,2")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+    
+  /** test unbounded groupBy (without window) **/
+  @Test
+  def testUnboundedGroupBy(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT b, COUNT(a) FROM MyTable GROUP BY b"
+
+    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val result = tEnv.sql(sqlQuery).toRetractStream[Row]
+    result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+    env.execute()
+
+    val expected = List("1,1", "2,2", "3,3", "4,4", "5,5", "6,6")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  /** test selection **/
+  @Test
+  def testSelectExpressionFromTable(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable"
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("2,0", "4,1", "6,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test filtering with registered table **/
+  @Test
+  def testSimpleFilter(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE a = 3"
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test filtering with registered datastream **/
+  @Test
+  def testDatastreamFilter(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
+
+    val t = StreamTestData.getSmall3TupleDataStream(env)
+    tEnv.registerDataStream("MyTable", t)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test union with registered tables **/
+  @Test
+  def testUnion(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT * FROM T1 " +
+      "UNION ALL " +
+      "SELECT * FROM T2"
+
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T1", t1)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "1,1,Hi", "1,1,Hi",
+      "2,2,Hello", "2,2,Hello",
+      "3,2,Hello world", "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test union with filter **/
+  @Test
+  def testUnionWithFilter(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " +
+      "UNION ALL " +
+      "SELECT * FROM T2 WHERE a = 2"
+
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T1", t1)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "2,2,Hello",
+      "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test union of a table and a datastream **/
+  @Test
+  def testUnionTableWithDataSet(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " +
+      "UNION ALL " +
+      "SELECT c FROM T2 WHERE a = 2"
+
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T1", t1)
+    val t2 = StreamTestData.get3TupleDataStream(env)
+    tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("Hello", "Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUnnestPrimitiveArrayFromTable(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = List(
+      (1, Array(12, 45), Array(Array(12, 45))),
+      (2, Array(41, 5), Array(Array(18), Array(87))),
+      (3, Array(18, 42), Array(Array(1), Array(45)))
+    )
+    val stream = env.fromCollection(data)
+    tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "1,[12, 45],12",
+      "1,[12, 45],45",
+      "2,[41, 5],41",
+      "2,[41, 5],5",
+      "3,[18, 42],18",
+      "3,[18, 42],42"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUnnestArrayOfArrayFromTable(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = List(
+      (1, Array(12, 45), Array(Array(12, 45))),
+      (2, Array(41, 5), Array(Array(18), Array(87))),
+      (3, Array(18, 42), Array(Array(1), Array(45)))
+    )
+    val stream = env.fromCollection(data)
+    tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "1,[12, 45]",
+      "2,[18]",
+      "2,[87]",
+      "3,[1]",
+      "3,[45]")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUnnestObjectArrayFromTableWithFilter(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = List(
+      (1, Array((12, "45.6"), (12, "45.612"))),
+      (2, Array((13, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val stream = env.fromCollection(data)
+    tEnv.registerDataStream("T", stream, 'a, 'b)
+
+    val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "2,[(13,41.6), (14,45.2136)],14,45.2136",
+      "3,[(18,42.6)],18,42.6")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/TableSourceITCase.scala
new file mode 100644
index 0000000..31e7c5a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/TableSourceITCase.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.datastream.StreamITCase
+import org.apache.flink.table.utils.CommonTestData
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class TableSourceITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testCsvTableSource(): Unit = {
+
+    val csvTable = CommonTestData.getCsvTableSource
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    tEnv.registerTableSource("persons", csvTable)
+
+    tEnv.sql(
+      "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
+      .toAppendStream[Row]
+      .addSink(new StreamITCase.StringSink[Row])
+
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,Mike,Smith,12.3",
+      "2,Bob,Taylor,45.6",
+      "3,Sam,Miller,7.89")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala
new file mode 100644
index 0000000..2e8f206
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.expressions.utils.{RichFunc1, RichFunc2}
+import org.apache.flink.table.runtime.datastream.StreamITCase
+import org.apache.flink.table.utils.UserDefinedFunctionTestUtils
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class CalcITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testSimpleSelectAll(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
+
+    val results = ds.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "1,1,Hi",
+        "2,2,Hello",
+        "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSelectFirst(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
+
+    val results = ds.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("1", "2", "3")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSimpleSelectWithNaming(): Unit = {
+
+    // verify ProjectMergeRule.
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
+      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
+      .select('a, 'b)
+
+    val results = ds.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
+      "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
+      "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSimpleSelectAllWithAs(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+      .select('a, 'b, 'c)
+
+    val results = ds.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "1,1,Hi",
+        "2,2,Hello",
+        "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+ @Test
+  def testSimpleFilter(): Unit = {
+    /*
+     * Test simple filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter('a === 3)
+    val results = filterDs.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+    /*
+     * Test all-rejecting filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(false) )
+    val results = filterDs.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    assertEquals(true, StreamITCase.testResults.isEmpty)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+    /*
+     * Test all-passing filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(true) )
+    val results = filterDs.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "1,1,Hi",
+        "2,2,Hello",
+        "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testFilterOnIntegerTupleField(): Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 === 0 )
+    val results = filterDs.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "2,2,Hello", "4,3,Hello world, how are you?",
+      "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
+      "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
+      "18,6,Comment#12", "20,6,Comment#14")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNotEquals(): Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 !== 0)
+    val results = filterDs.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+    val expected = mutable.MutableList(
+      "1,1,Hi", "3,2,Hello world",
+      "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
+      "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
+      "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUserDefinedFunctionWithParameter(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.registerFunction("RichFunc2", new RichFunc2)
+    UserDefinedFunctionTestUtils.setJobParameters(env, Map("string.value" -> "ABC"))
+
+    StreamITCase.testResults = mutable.MutableList()
+
+    val result = StreamTestData.get3TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .where("RichFunc2(c)='ABC#Hello'")
+      .select('c)
+
+    val results = result.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("Hello")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testMultipleUserDefinedFunctions(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.registerFunction("RichFunc1", new RichFunc1)
+    tEnv.registerFunction("RichFunc2", new RichFunc2)
+    UserDefinedFunctionTestUtils.setJobParameters(env, Map("string.value" -> "Abc"))
+
+    StreamITCase.testResults = mutable.MutableList()
+
+    val result = StreamTestData.get3TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .where("RichFunc2(c)='Abc#Hello' || RichFunc1(a)=3 && b=2")
+      .select('c)
+
+    val results = result.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("Hello", "Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala
new file mode 100644
index 0000000..8df14e8
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream.table
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.datastream.StreamITCase.RetractingSink
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+  * Tests of groupby (without window) aggregations
+  */
+class GroupAggregationsITCase extends StreamingWithStateTestBase {
+  private val queryConfig = new StreamQueryConfig()
+  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+
+
+  @Test
+  def testDistinct(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+      .select('b).distinct()
+
+    val results = t.toRetractStream[Row](queryConfig)
+    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+    env.execute()
+
+    val expected = mutable.MutableList("1", "2", "3", "4", "5", "6")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testDistinctAfterAggregate(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .groupBy('e).select('e, 'a.count).distinct()
+
+    val results = t.toRetractStream[Row](queryConfig)
+    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+    env.execute()
+
+    val expected = mutable.MutableList("1,5", "2,7", "3,3")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testNonKeyedGroupAggregate(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+            .select('a.sum, 'b.sum)
+
+    val results = t.toRetractStream[Row](queryConfig)
+    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+    env.execute()
+
+    val expected = List("231,91")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testGroupAggregate(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+      .groupBy('b)
+      .select('b, 'a.sum)
+
+    val results = t.toRetractStream[Row](queryConfig)
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+
+    val expected = List("1,1", "2,5", "3,15", "4,34", "5,65", "6,111")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testDoubleGroupAggregation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+      .groupBy('b)
+      .select('a.count as 'cnt, 'b)
+      .groupBy('cnt)
+      .select('cnt, 'b.count as 'freq)
+
+    val results = t.toRetractStream[Row](queryConfig)
+
+    results.addSink(new RetractingSink)
+    env.execute()
+    val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testGroupAggregateWithExpression(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .groupBy('e, 'b % 3)
+      .select('c.min, 'e, 'a.avg, 'd.count)
+
+    val results = t.toRetractStream[Row](queryConfig)
+    results.addSink(new RetractingSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "0,1,1,1", "7,1,4,2", "2,1,3,2",
+      "3,2,3,3", "1,2,3,3", "14,2,5,1",
+      "12,3,5,1", "5,3,4,2")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala
new file mode 100644
index 0000000..1af85d9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream.table
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
+import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.table.runtime.datastream.StreamITCase
+import org.apache.flink.table.runtime.datastream.table.GroupWindowAggregationsITCase._
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+  * We only test some aggregations until better testing of constructed DataStream
+  * programs is possible.
+  */
+class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
+  private val queryConfig = new StreamQueryConfig()
+  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+
+  val data = List(
+    (1L, 1, "Hi"),
+    (2L, 2, "Hello"),
+    (4L, 2, "Hello"),
+    (8L, 3, "Hello world"),
+    (16L, 3, "Hello world"))
+
+  val data2 = List(
+    (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
+    (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
+    (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
+    (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
+    (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
+    (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
+    (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
+
+    val countFun = new CountAggFunction
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+      .groupBy('w, 'string)
+      .select('string, countFun('int), 'int.avg,
+              weightAvgFun('long, 'int), weightAvgFun('int, 'int))
+
+    val results = windowedTable.toAppendStream[Row](queryConfig)
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq("Hello world,1,3,8,3", "Hello world,2,3,12,3", "Hello,1,2,2,2",
+                       "Hello,2,2,3,2", "Hi,1,1,1,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSessionGroupWindowOverTime(): Unit = {
+    //To verify the "merge" functionality, we create this test with the following characteristics:
+    // 1. set the Parallelism to 1, and have the test data out of order
+    // 2. create a waterMark with 10ms offset to delay the window emission by 10ms
+    val sessionWindowTestdata = List(
+      (1L, 1, "Hello"),
+      (2L, 2, "Hello"),
+      (8L, 8, "Hello"),
+      (9L, 9, "Hello World"),
+      (4L, 4, "Hello"),
+      (16L, 16, "Hello"))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val countFun = new CountAggFunction
+    val weightAvgFun = new WeightedAvgWithMerge
+
+    val stream = env
+      .fromCollection(sessionWindowTestdata)
+      .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](10L))
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Session withGap 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, countFun('int), 'int.avg,
+              weightAvgFun('long, 'int), weightAvgFun('int, 'int))
+
+    val results = windowedTable.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq("Hello World,1,9,9,9", "Hello,1,16,16,16", "Hello,4,3,5,5")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
+    val countFun = new CountAggFunction
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(Tumble over 2.rows on 'proctime as 'w)
+      .groupBy('w)
+      .select(countFun('string), 'int.avg,
+              weightAvgFun('long, 'int), weightAvgFun('int, 'int))
+
+    val results = windowedTable.toAppendStream[Row](queryConfig)
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq("2,1,1,1", "2,2,6,2")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeTumblingWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](0L))
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
+    val countFun = new CountAggFunction
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, countFun('string), 'int.avg, weightAvgFun('long, 'int),
+              weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
+
+    val results = windowedTable.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "Hello world,1,3,8,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
+      "Hello world,1,3,16,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
+      "Hello,2,2,3,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hi,1,1,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testGroupWindowWithoutKeyInProjection(): Unit = {
+    val data = List(
+      (1L, 1, "Hi", 1, 1),
+      (2L, 2, "Hello", 2, 2),
+      (4L, 2, "Hello", 2, 2),
+      (8L, 3, "Hello world", 3, 3),
+      (16L, 3, "Hello world", 3, 3))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3, 'proctime.proctime)
+
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+      .groupBy('w, 'int2, 'int3, 'string)
+      .select(weightAvgFun('long, 'int))
+
+    val results = windowedTable.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq("12", "8", "2", "3", "1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+
+
+  // ----------------------------------------------------------------------------------------------
+  // Sliding windows
+  // ----------------------------------------------------------------------------------------------
+
+  @Test
+  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+    // please keep this test in sync with the DataSet variant
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data2)
+      .assignTimestampsAndWatermarks(
+        new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
+    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 5.milli every 2.milli on 'long as 'w)
+      .groupBy('w)
+      .select('int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013",
+      "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017",
+      "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019",
+      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
+      "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003",
+      "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011",
+      "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007",
+      "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
+      "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = {
+    // please keep this test in sync with the DataSet variant
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data2)
+      .assignTimestampsAndWatermarks(
+        new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
+    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 10.milli every 5.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+      "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
+      "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02",
+      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025",
+      "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
+      "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+      "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+      "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = {
+    // please keep this test in sync with the DataSet variant
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data2)
+      .assignTimestampsAndWatermarks(
+        new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
+    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 5.milli every 4.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
+      "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013",
+      "Hello world,1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017",
+      "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
+      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = {
+    // please keep this test in sync with the DataSet variant
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data2)
+      .assignTimestampsAndWatermarks(
+        new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
+    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 5.milli every 10.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeNonOverlappingSplitPane(): Unit = {
+    // please keep this test in sync with the DataSet variant
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data2)
+      .assignTimestampsAndWatermarks(
+        new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
+    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 3.milli every 10.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeGroupWindowWithoutExplicitTimeField(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data2)
+      .assignTimestampsAndWatermarks(
+        new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
+      .map(t => (t._2, t._6))
+    val table = stream.toTable(tEnv, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Slide over 3.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+    val expected = Seq(
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}
+
+object GroupWindowAggregationsITCase {
+
+  class TimestampAndWatermarkWithOffset[T <: Product](
+    offset: Long) extends AssignerWithPunctuatedWatermarks[T] {
+
+    override def checkAndGetNextWatermark(
+        lastElement: T,
+        extractedTimestamp: Long): Watermark = {
+      new Watermark(extractedTimestamp - offset)
+    }
+
+    override def extractTimestamp(
+        element: T,
+        previousElementTimestamp: Long): Long = {
+      element.productElement(0).asInstanceOf[Long]
+    }
+  }
+
+}