You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:20 UTC

[11/44] flink git commit: [FLINK-6617][table] Improve JAVA and SCALA logical plans consistent test

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
deleted file mode 100644
index 253d54a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ /dev/null
@@ -1,864 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, TableException}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-
-import scala.collection.mutable
-
-class OverWindowITCase extends StreamingWithStateTestBase {
-
-  val data = List(
-    (1L, 1, "Hello"),
-    (2L, 2, "Hello"),
-    (3L, 3, "Hello"),
-    (4L, 4, "Hello"),
-    (5L, 5, "Hello"),
-    (6L, 6, "Hello"),
-    (7L, 7, "Hello World"),
-    (8L, 8, "Hello World"),
-    (20L, 20, "Hello World"))
-
-  /**
-    * All aggregates must be computed on the same window.
-    */
-  @Test(expected = classOf[TableException])
-  def testMultiWindow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
-      "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-  }
-
-  @Test
-  def testProcTimeBoundedPartitionedRowsOver(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-    tEnv.registerTable("MyTable", t)
-
-    val sqlQuery = "SELECT a, " +
-      "  SUM(c) OVER (" +
-      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " +
-      "  MIN(c) OVER (" +
-      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
-      "FROM MyTable"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,0,0",
-      "2,1,1",
-      "2,3,1",
-      "3,3,3",
-      "3,7,3",
-      "3,12,3",
-      "4,6,6",
-      "4,13,6",
-      "4,21,6",
-      "4,30,6",
-      "5,10,10",
-      "5,21,10",
-      "5,33,10",
-      "5,46,10",
-      "5,60,10")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testProcTimeBoundedNonPartitionedRowsOver(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-    tEnv.registerTable("MyTable", t)
-
-    val sqlQuery = "SELECT a, " +
-      "  SUM(c) OVER (" +
-      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " +
-      "  MIN(c) OVER (" +
-      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
-      "FROM MyTable"
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,0,0",
-      "2,1,0",
-      "2,3,0",
-      "3,6,0",
-      "3,10,0",
-      "3,15,0",
-      "4,21,0",
-      "4,28,0",
-      "4,36,0",
-      "4,45,0",
-      "5,55,0",
-      "5,66,1",
-      "5,77,2",
-      "5,88,3",
-      "5,99,4")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testProcTimeUnboundedPartitionedRangeOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    // for sum aggregation ensure that every time the order of each element is consistent
-    env.setParallelism(1)
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
-      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
-      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testProcTimeUnboundedPartitionedRowsOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT c, cnt1 from " +
-      "(SELECT " +
-      "c, " +
-      "count(a) " +
-      " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
-      "as cnt1 from T1)"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello World,1", "Hello World,2", "Hello World,3",
-      "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
-    val queryConfig =
-      new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(2), Time.hours(3))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    // for sum aggregation ensure that every time the order of each element is consistent
-    env.setParallelism(1)
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY proctime  RANGE UNBOUNDED preceding), " +
-      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig)
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
-      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testProcTimeUnboundedNonPartitionedRowsOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeBoundedPartitionedRangeOver(): Unit = {
-    val data = Seq(
-      Left((1500L, (1L, 15, "Hello"))),
-      Left((1600L, (1L, 16, "Hello"))),
-      Left((1000L, (1L, 1, "Hello"))),
-      Left((2000L, (2L, 2, "Hello"))),
-      Right(1000L),
-      Left((2000L, (2L, 2, "Hello"))),
-      Left((2000L, (2L, 3, "Hello"))),
-      Left((3000L, (3L, 3, "Hello"))),
-      Right(2000L),
-      Left((4000L, (4L, 4, "Hello"))),
-      Right(3000L),
-      Left((5000L, (5L, 5, "Hello"))),
-      Right(5000L),
-      Left((6000L, (6L, 6, "Hello"))),
-      Left((6500L, (6L, 65, "Hello"))),
-      Right(7000L),
-      Left((9000L, (6L, 9, "Hello"))),
-      Left((9500L, (6L, 18, "Hello"))),
-      Left((9000L, (6L, 9, "Hello"))),
-      Right(10000L),
-      Left((10000L, (7L, 7, "Hello World"))),
-      Left((11000L, (7L, 17, "Hello World"))),
-      Left((11000L, (7L, 77, "Hello World"))),
-      Right(12000L),
-      Left((14000L, (7L, 18, "Hello World"))),
-      Right(14000L),
-      Left((15000L, (8L, 8, "Hello World"))),
-      Right(17000L),
-      Left((20000L, (20L, 20, "Hello World"))),
-      Right(19000L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "  c, b, " +
-      "  COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
-      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
-      "  SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
-      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
-      " FROM T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
-      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
-      "Hello,3,4,9",
-      "Hello,4,2,7",
-      "Hello,5,2,9",
-      "Hello,6,2,11", "Hello,65,2,12",
-      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
-      "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
-      "Hello World,8,2,15",
-      "Hello World,20,1,20")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeBoundedPartitionedRowsOver(): Unit = {
-    val data = Seq(
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((3L, (7L, 7, "Hello World"))),
-      Left((1L, (7L, 7, "Hello World"))),
-      Left((1L, (7L, 7, "Hello World"))),
-      Right(2L),
-      Left((3L, (3L, 3, "Hello"))),
-      Left((4L, (4L, 4, "Hello"))),
-      Left((5L, (5L, 5, "Hello"))),
-      Left((6L, (6L, 6, "Hello"))),
-      Left((20L, (20L, 20, "Hello World"))),
-      Right(6L),
-      Left((8L, (8L, 8, "Hello World"))),
-      Left((7L, (7L, 7, "Hello World"))),
-      Right(20L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      " c, a, " +
-      "  COUNT(a) " +
-      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
-      "  SUM(a) " +
-      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
-      "FROM T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
-      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
-      "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
-      "Hello,6,3,15",
-      "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
-      "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeBoundedNonPartitionedRangeOver(): Unit = {
-    val data = Seq(
-      Left((1500L, (1L, 15, "Hello"))),
-      Left((1600L, (1L, 16, "Hello"))),
-      Left((1000L, (1L, 1, "Hello"))),
-      Left((2000L, (2L, 2, "Hello"))),
-      Right(1000L),
-      Left((2000L, (2L, 2, "Hello"))),
-      Left((2000L, (2L, 3, "Hello"))),
-      Left((3000L, (3L, 3, "Hello"))),
-      Right(2000L),
-      Left((4000L, (4L, 4, "Hello"))),
-      Right(3000L),
-      Left((5000L, (5L, 5, "Hello"))),
-      Right(5000L),
-      Left((6000L, (6L, 6, "Hello"))),
-      Left((6500L, (6L, 65, "Hello"))),
-      Right(7000L),
-      Left((9000L, (6L, 9, "Hello"))),
-      Left((9500L, (6L, 18, "Hello"))),
-      Left((9000L, (6L, 9, "Hello"))),
-      Right(10000L),
-      Left((10000L, (7L, 7, "Hello World"))),
-      Left((11000L, (7L, 17, "Hello World"))),
-      Left((11000L, (7L, 77, "Hello World"))),
-      Right(12000L),
-      Left((14000L, (7L, 18, "Hello World"))),
-      Right(14000L),
-      Left((15000L, (8L, 8, "Hello World"))),
-      Right(17000L),
-      Left((20000L, (20L, 20, "Hello World"))),
-      Right(19000L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "  c, b, " +
-      "  COUNT(a) " +
-      "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
-      "  SUM(a) " +
-      "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
-      " FROM T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
-      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
-      "Hello,3,4,9",
-      "Hello,4,2,7",
-      "Hello,5,2,9",
-      "Hello,6,2,11", "Hello,65,2,12",
-      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
-      "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
-      "Hello World,8,2,15",
-      "Hello World,20,1,20")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeBoundedNonPartitionedRowsOver(): Unit = {
-
-    val data = Seq(
-      Left((2L, (2L, 2, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((20L, (20L, 20, "Hello World"))), // early row
-      Right(3L),
-      Left((2L, (2L, 2, "Hello"))), // late row
-      Left((3L, (3L, 3, "Hello"))),
-      Left((4L, (4L, 4, "Hello"))),
-      Left((5L, (5L, 5, "Hello"))),
-      Left((6L, (6L, 6, "Hello"))),
-      Left((7L, (7L, 7, "Hello World"))),
-      Right(7L),
-      Left((9L, (9L, 9, "Hello World"))),
-      Left((8L, (8L, 8, "Hello World"))),
-      Left((8L, (8L, 8, "Hello World"))),
-      Right(20L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, a, " +
-      "  COUNT(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW), " +
-      "  SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
-      "FROM T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
-      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
-      "Hello,3,3,7",
-      "Hello,4,3,9", "Hello,5,3,12",
-      "Hello,6,3,15", "Hello World,7,3,18",
-      "Hello World,8,3,21", "Hello World,8,3,23",
-      "Hello World,9,3,25",
-      "Hello World,20,3,37")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "  SUM(b) OVER (" +
-      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  COUNT(b) OVER (" +
-      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  AVG(b) OVER (" +
-      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  MAX(b) OVER (" +
-      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  MIN(b) OVER (" +
-      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
-      "FROM T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (1, 1L, "Hello")),
-      Left(14000002L, (1, 2L, "Hello")),
-      Left(14000002L, (1, 3L, "Hello world")),
-      Left(14000003L, (2, 2L, "Hello world")),
-      Left(14000003L, (2, 3L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 4L, "Hello world")),
-      Left(14000022L, (1, 5L, "Hello world")),
-      Left(14000022L, (1, 6L, "Hello world")),
-      Left(14000022L, (1, 7L, "Hello world")),
-      Left(14000023L, (2, 4L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Right(14000030L))
-
-    val t1 = env
-      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,1,Hello,6,3,2,3,1",
-      "1,2,Hello,6,3,2,3,1",
-      "1,3,Hello world,6,3,2,3,1",
-      "1,1,Hi,7,4,1,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,6,3,2,3,1",
-      "2,3,Hello world,6,3,2,3,1",
-      "1,4,Hello world,11,5,2,4,1",
-      "1,5,Hello world,29,8,3,7,1",
-      "1,6,Hello world,29,8,3,7,1",
-      "1,7,Hello world,29,8,3,7,1",
-      "2,4,Hello world,15,5,3,5,1",
-      "2,5,Hello world,15,5,3,5,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeUnBoundedPartitionedRowsOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "count(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "avg(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "max(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "min(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (3, 1L, "Hello")),
-      Left(14000003L, (1, 2L, "Hello")),
-      Left(14000004L, (1, 3L, "Hello world")),
-      Left(14000007L, (3, 2L, "Hello world")),
-      Left(14000008L, (2, 2L, "Hello world")),
-      Right(14000010L),
-      Left(14000012L, (1, 5L, "Hello world")),
-      Left(14000021L, (1, 6L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Right(14000020L),
-      Left(14000024L, (3, 5L, "Hello world")),
-      Left(14000026L, (1, 7L, "Hello world")),
-      Left(14000025L, (1, 8L, "Hello world")),
-      Left(14000022L, (1, 9L, "Hello world")),
-      Right(14000030L))
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-             .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,2,Hello,2,1,2,2,2",
-      "1,3,Hello world,5,2,2,3,2",
-      "1,1,Hi,6,3,2,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,3,2,1,2,1",
-      "3,1,Hello,1,1,1,1,1",
-      "3,2,Hello world,3,2,1,2,1",
-      "1,5,Hello world,11,4,2,5,1",
-      "1,6,Hello world,17,5,3,6,1",
-      "1,9,Hello world,26,6,4,9,1",
-      "1,8,Hello world,34,7,4,9,1",
-      "1,7,Hello world,41,8,5,9,1",
-      "2,5,Hello world,8,3,2,5,1",
-      "3,5,Hello world,8,3,2,5,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeUnBoundedNonPartitionedRangeOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "  SUM(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  COUNT(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  AVG(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  MAX(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  MIN(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
-      "FROM T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (1, 1L, "Hello")),
-      Left(14000002L, (1, 2L, "Hello")),
-      Left(14000002L, (1, 3L, "Hello world")),
-      Left(14000003L, (2, 2L, "Hello world")),
-      Left(14000003L, (2, 3L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 4L, "Hello world")),
-      Left(14000022L, (1, 5L, "Hello world")),
-      Left(14000022L, (1, 6L, "Hello world")),
-      Left(14000022L, (1, 7L, "Hello world")),
-      Left(14000023L, (2, 4L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Right(14000030L))
-
-    val t1 = env
-      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "2,1,Hello,1,1,1,1,1",
-      "1,1,Hello,7,4,1,3,1",
-      "1,2,Hello,7,4,1,3,1",
-      "1,3,Hello world,7,4,1,3,1",
-      "2,2,Hello world,12,6,2,3,1",
-      "2,3,Hello world,12,6,2,3,1",
-      "1,1,Hi,13,7,1,3,1",
-      "1,4,Hello world,17,8,2,4,1",
-      "1,5,Hello world,35,11,3,7,1",
-      "1,6,Hello world,35,11,3,7,1",
-      "1,7,Hello world,35,11,3,7,1",
-      "2,4,Hello world,44,13,3,7,1",
-      "2,5,Hello world,44,13,3,7,1")
-
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeUnBoundedNonPartitionedRowsOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "  SUM(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  COUNT(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  AVG(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  MAX(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  MIN(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
-      "FROM T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 2L, "Hello")),
-      Left(14000002L, (3, 5L, "Hello")),
-      Left(14000003L, (1, 3L, "Hello")),
-      Left(14000004L, (3, 7L, "Hello world")),
-      Left(14000007L, (4, 9L, "Hello world")),
-      Left(14000008L, (5, 8L, "Hello world")),
-      Right(14000010L),
-      // this element will be discard because it is late
-      Left(14000008L, (6, 8L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (6, 8L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env
-      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "2,2,Hello,2,1,2,2,2",
-      "3,5,Hello,7,2,3,5,2",
-      "1,3,Hello,10,3,3,5,2",
-      "3,7,Hello world,17,4,4,7,2",
-      "1,1,Hi,18,5,3,7,1",
-      "4,9,Hello world,27,6,4,9,1",
-      "5,8,Hello world,35,7,5,9,1",
-      "6,8,Hello world,43,8,5,9,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test sliding event-time unbounded window with partition by **/
-  @Test
-  def testRowTimeUnBoundedPartitionedRowsOver2(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "count(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "avg(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "max(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "min(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (3, 1L, "Hello")),
-      Left(14000003L, (1, 2L, "Hello")),
-      Left(14000004L, (1, 3L, "Hello world")),
-      Left(14000007L, (3, 2L, "Hello world")),
-      Left(14000008L, (2, 2L, "Hello world")),
-      Right(14000010L),
-      // the next 3 elements are late
-      Left(14000008L, (1, 4L, "Hello world")),
-      Left(14000008L, (2, 3L, "Hello world")),
-      Left(14000008L, (3, 3L, "Hello world")),
-      Left(14000012L, (1, 5L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 6L, "Hello world")),
-      // the next 3 elements are late
-      Left(14000019L, (1, 6L, "Hello world")),
-      Left(14000018L, (2, 4L, "Hello world")),
-      Left(14000018L, (3, 4L, "Hello world")),
-      Left(14000022L, (2, 5L, "Hello world")),
-      Left(14000022L, (3, 5L, "Hello world")),
-      Left(14000024L, (1, 7L, "Hello world")),
-      Left(14000023L, (1, 8L, "Hello world")),
-      Left(14000021L, (1, 9L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,2,Hello,2,1,2,2,2",
-      "1,3,Hello world,5,2,2,3,2",
-      "1,1,Hi,6,3,2,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,3,2,1,2,1",
-      "3,1,Hello,1,1,1,1,1",
-      "3,2,Hello world,3,2,1,2,1",
-      "1,5,Hello world,11,4,2,5,1",
-      "1,6,Hello world,17,5,3,6,1",
-      "1,9,Hello world,26,6,4,9,1",
-      "1,8,Hello world,34,7,4,9,1",
-      "1,7,Hello world,41,8,5,9,1",
-      "2,5,Hello world,8,3,2,5,1",
-      "3,5,Hello world,8,3,2,5,1"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
deleted file mode 100644
index 55633ff..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit._
-
-class SqlITCase extends StreamingWithStateTestBase {
-
-   /** test row stream registered table **/
-  @Test
-  def testRowRegister(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT * FROM MyTableRow WHERE c < 3"
-
-    val data = List(
-      Row.of("Hello", "Worlds", Int.box(1)),
-      Row.of("Hello", "Hiden", Int.box(5)),
-      Row.of("Hello again", "Worlds", Int.box(2)))
-        
-    implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO) // tpe is automatically 
-    
-    val ds = env.fromCollection(data)
-    
-    val t = ds.toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTableRow", t)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("Hello,Worlds,1","Hello again,Worlds,2")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-    
-  /** test unbounded groupby (without window) **/
-  @Test
-  def testUnboundedGroupby(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT b, COUNT(a) FROM MyTable GROUP BY b"
-
-    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", t)
-
-    val result = tEnv.sql(sqlQuery).toRetractStream[Row]
-    result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
-    env.execute()
-
-    val expected = List("1,1", "2,2", "3,3", "4,4", "5,5", "6,6")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  /** test selection **/
-  @Test
-  def testSelectExpressionFromTable(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable"
-
-    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", t)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("2,0", "4,1", "6,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test filtering with registered table **/
-  @Test
-  def testSimpleFilter(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE a = 3"
-
-    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", t)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test filtering with registered datastream **/
-  @Test
-  def testDatastreamFilter(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
-
-    val t = StreamTestData.getSmall3TupleDataStream(env)
-    tEnv.registerDataStream("MyTable", t)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test union with registered tables **/
-  @Test
-  def testUnion(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT * FROM T1 " +
-      "UNION ALL " +
-      "SELECT * FROM T2"
-
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T1", t1)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T2", t2)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,1,Hi", "1,1,Hi",
-      "2,2,Hello", "2,2,Hello",
-      "3,2,Hello world", "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test union with filter **/
-  @Test
-  def testUnionWithFilter(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " +
-      "UNION ALL " +
-      "SELECT * FROM T2 WHERE a = 2"
-
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T1", t1)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T2", t2)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "2,2,Hello",
-      "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test union of a table and a datastream **/
-  @Test
-  def testUnionTableWithDataSet(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " +
-      "UNION ALL " +
-      "SELECT c FROM T2 WHERE a = 2"
-
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T1", t1)
-    val t2 = StreamTestData.get3TupleDataStream(env)
-    tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("Hello", "Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnnestPrimitiveArrayFromTable(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val data = List(
-      (1, Array(12, 45), Array(Array(12, 45))),
-      (2, Array(41, 5), Array(Array(18), Array(87))),
-      (3, Array(18, 42), Array(Array(1), Array(45)))
-    )
-    val stream = env.fromCollection(data)
-    tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,[12, 45],12",
-      "1,[12, 45],45",
-      "2,[41, 5],41",
-      "2,[41, 5],5",
-      "3,[18, 42],18",
-      "3,[18, 42],42"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnnestArrayOfArrayFromTable(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val data = List(
-      (1, Array(12, 45), Array(Array(12, 45))),
-      (2, Array(41, 5), Array(Array(18), Array(87))),
-      (3, Array(18, 42), Array(Array(1), Array(45)))
-    )
-    val stream = env.fromCollection(data)
-    tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,[12, 45]",
-      "2,[18]",
-      "2,[87]",
-      "3,[1]",
-      "3,[45]")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnnestObjectArrayFromTableWithFilter(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val data = List(
-      (1, Array((12, "45.6"), (12, "45.612"))),
-      (2, Array((13, "41.6"), (14, "45.2136"))),
-      (3, Array((18, "42.6")))
-    )
-    val stream = env.fromCollection(data)
-    tEnv.registerDataStream("T", stream, 'a, 'b)
-
-    val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "2,[(13,41.6), (14,45.2136)],14,45.2136",
-      "3,[(18,42.6)],18,42.6")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index f95d0ab..cc9d786 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -18,8 +18,7 @@
 package org.apache.flink.table.api.scala.stream.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.utils.TableTestUtil._
@@ -31,17 +30,6 @@ class WindowAggregateTest extends TableTestBase {
   streamUtil.addTable[(Int, String, Long)](
     "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
 
-  /**
-    * OVER clause is necessary for [[OverAgg0]] window function.
-    */
-  @Test(expected = classOf[ValidationException])
-  def testOverAggregation(): Unit = {
-    streamUtil.addFunction("overAgg", new OverAgg0)
-
-    val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
-    streamUtil.tEnv.sql(sqlQuery)
-  }
-
   @Test
   def testGroupbyWithoutWindow() = {
     val sql = "SELECT COUNT(a) FROM MyTable GROUP BY b"
@@ -66,7 +54,7 @@ class WindowAggregateTest extends TableTestBase {
 
   @Test
   def testTumbleFunction() = {
-    streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
+    streamUtil.tableEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
 
     val sql =
       "SELECT " +
@@ -95,7 +83,7 @@ class WindowAggregateTest extends TableTestBase {
 
   @Test
   def testHoppingFunction() = {
-    streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
+    streamUtil.tableEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
 
     val sql =
       "SELECT COUNT(*), weightedAvg(c, a) AS wAvg, " +
@@ -123,7 +111,7 @@ class WindowAggregateTest extends TableTestBase {
 
   @Test
   def testSessionFunction() = {
-    streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
+    streamUtil.tableEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
 
     val sql =
       "SELECT " +
@@ -177,51 +165,4 @@ class WindowAggregateTest extends TableTestBase {
     streamUtil.verifySql(sql, expected)
   }
 
-  @Test(expected = classOf[TableException])
-  def testTumbleWindowNoOffset(): Unit = {
-    val sqlQuery =
-      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
-        "FROM MyTable " +
-        "GROUP BY TUMBLE(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
-
-    streamUtil.verifySql(sqlQuery, "n/a")
-  }
-
-  @Test(expected = classOf[TableException])
-  def testHopWindowNoOffset(): Unit = {
-    val sqlQuery =
-      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
-        "FROM MyTable " +
-        "GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')"
-
-    streamUtil.verifySql(sqlQuery, "n/a")
-  }
-
-  @Test(expected = classOf[TableException])
-  def testSessionWindowNoOffset(): Unit = {
-    val sqlQuery =
-      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
-        "FROM MyTable " +
-        "GROUP BY SESSION(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
-
-    streamUtil.verifySql(sqlQuery, "n/a")
-  }
-
-  @Test(expected = classOf[TableException])
-  def testVariableWindowSize() = {
-    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, c * INTERVAL '1' MINUTE)"
-    streamUtil.verifySql(sql, "n/a")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testWindowUdAggInvalidArgs(): Unit = {
-    streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
-
-    val sqlQuery =
-      "SELECT SUM(a) AS sumA, weightedAvg(a, b) AS wAvg " +
-        "FROM MyTable " +
-        "GROUP BY TUMBLE(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')"
-
-    streamUtil.verifySql(sqlQuery, "n/a")
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/AggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/AggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/AggregationsValidationTest.scala
new file mode 100644
index 0000000..b1efc09
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/AggregationsValidationTest.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
+import org.apache.flink.table.expressions.AggFunctionCall
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Ignore, Test}
+
+class AggregationsValidationTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
+
+  /**
+    * OVER clause is necessary for [[OverAgg0]] window function.
+    */
+  @Test(expected = classOf[ValidationException])
+  def testOverAggregation(): Unit = {
+    streamUtil.addFunction("overAgg", new OverAgg0)
+
+    val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
+
+    streamUtil.tableEnv.sql(sqlQuery)
+  }
+
+  @Test
+  def testDistinct(): Unit = {
+    val sql = "SELECT DISTINCT a, b, c FROM MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamGroupAggregate",
+        streamTableNode(0),
+        term("groupBy", "a, b, c"),
+        term("select", "a, b, c")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  // TODO: this query should be optimized to only have a single DataStreamGroupAggregate
+  // TODO: reopen this until FLINK-7144 fixed
+  @Ignore
+  @Test
+  def testDistinctAfterAggregate(): Unit = {
+    val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c"
+
+    val expected =
+      unaryNode(
+        "DataStreamGroupAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "a")
+        ),
+        term("groupBy", "a"),
+        term("select", "a")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+
+  @Test
+  def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = {
+    streamUtil.addFunction("udag", new MyAgg)
+    val call = streamUtil
+      .tEnv
+      .functionCatalog
+      .lookupFunction("udag", Seq())
+      .asInstanceOf[AggFunctionCall]
+
+    val typeInfo = call.accTypeInfo
+    assertTrue(typeInfo.isInstanceOf[CaseClassTypeInfo[_]])
+    assertEquals(2, typeInfo.getTotalFields)
+    val caseTypeInfo = typeInfo.asInstanceOf[CaseClassTypeInfo[_]]
+    assertEquals(Types.LONG, caseTypeInfo.getTypeAt(0))
+    assertEquals(Types.LONG, caseTypeInfo.getTypeAt(1))
+
+    streamUtil.addFunction("udag2", new MyAgg2)
+    val call2 = streamUtil
+      .tEnv
+      .functionCatalog
+      .lookupFunction("udag2", Seq())
+      .asInstanceOf[AggFunctionCall]
+
+    val typeInfo2 = call2.accTypeInfo
+    assertTrue(s"actual type: $typeInfo2", typeInfo2.isInstanceOf[RowTypeInfo])
+    assertEquals(2, typeInfo2.getTotalFields)
+    val rowTypeInfo = typeInfo2.asInstanceOf[RowTypeInfo]
+    assertEquals(Types.LONG, rowTypeInfo.getTypeAt(0))
+    assertEquals(Types.INT, rowTypeInfo.getTypeAt(1))
+  }
+}
+
+case class MyAccumulator(var sum: Long, var count: Long)
+
+class MyAgg extends AggregateFunction[Long, MyAccumulator] {
+
+  //Overloaded accumulate method
+  def accumulate(acc: MyAccumulator, value: Long): Unit = {
+  }
+
+  override def createAccumulator(): MyAccumulator = MyAccumulator(0, 0)
+
+  override def getValue(accumulator: MyAccumulator): Long = 1L
+}
+
+class MyAgg2 extends AggregateFunction[Long, Row] {
+
+  def accumulate(acc: Row, value: Long): Unit = {}
+
+  override def createAccumulator(): Row = new Row(2)
+
+  override def getValue(accumulator: Row): Long = 1L
+
+  def getAccumulatorType: TypeInformation[_] = new RowTypeInfo(Types.LONG, Types.INT)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/OverWindowValidationTest.scala
new file mode 100644
index 0000000..3cc13f4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/OverWindowValidationTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class OverWindowValidationTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)]("T1", 'a, 'b, 'c, 'proctime.proctime)
+
+  /**
+    * All aggregates must be computed on the same window.
+    */
+  @Test(expected = classOf[TableException])
+  def testMultiWindow(): Unit = {
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
+      "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " +
+      "from T1"
+
+    streamUtil.tableEnv.sql(sqlQuery).toAppendStream[Row]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/WindowAggregateValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/WindowAggregateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/WindowAggregateValidationTest.scala
new file mode 100644
index 0000000..0fd8740
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/WindowAggregateValidationTest.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class WindowAggregateValidationTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)](
+    "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+  /**
+    * OVER clause is necessary for [[OverAgg0]] window function.
+    */
+  @Test(expected = classOf[ValidationException])
+  def testOverAggregation(): Unit = {
+    streamUtil.addFunction("overAgg", new OverAgg0)
+
+    val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
+    streamUtil.tableEnv.sql(sqlQuery)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testTumbleWindowNoOffset(): Unit = {
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM MyTable " +
+        "GROUP BY TUMBLE(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    streamUtil.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testHopWindowNoOffset(): Unit = {
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM MyTable " +
+        "GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    streamUtil.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testSessionWindowNoOffset(): Unit = {
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM MyTable " +
+        "GROUP BY SESSION(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    streamUtil.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testVariableWindowSize() = {
+    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, c * INTERVAL '1' MINUTE)"
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testWindowUdAggInvalidArgs(): Unit = {
+    streamUtil.tableEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
+
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, weightedAvg(a, b) AS wAvg " +
+        "FROM MyTable " +
+        "GROUP BY TUMBLE(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    streamUtil.verifySql(sqlQuery, "n/a")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
deleted file mode 100644
index adf4d44..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.table.api.{TableEnvironment, TableException}
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class CalcITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
-
-    val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSelectFirst(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
-
-    val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList("1", "2", "3")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleSelectWithNaming(): Unit = {
-
-    // verify ProjectMergeRule.
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
-      .select('a, 'b)
-
-    val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
-      "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
-      "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 'b, 'c)
-
-    val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testAsWithToManyFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
-
-    val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testAsWithAmbiguousFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
-
-    val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-
-  @Test(expected = classOf[TableException])
-  def testOnlyFieldRefInAs(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
-
-    val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleFilter(): Unit = {
-    /*
-     * Test simple filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter('a === 3)
-    val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList("3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-    /*
-     * Test all-rejecting filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(false) )
-    val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    assertEquals(true, StreamITCase.testResults.isEmpty)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-    /*
-     * Test all-passing filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(true) )
-    val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 === 0 )
-    val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "2,2,Hello", "4,3,Hello world, how are you?",
-      "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
-      "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
-      "18,6,Comment#12", "20,6,Comment#14")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testNotEquals(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 !== 0)
-    val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-    val expected = mutable.MutableList(
-      "1,1,Hi", "3,2,Hello world",
-      "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
-      "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
-      "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExplainTest.scala
new file mode 100644
index 0000000..20a88fe
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExplainTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.junit.Assert.assertEquals
+import org.junit._
+
+class ExplainTest
+  extends StreamingMultipleProgramsTestBase {
+
+  val testFilePath = this.getClass.getResource("/").getFile
+
+  @Test
+  def testFilter(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements((1, "hello"))
+      .toTable(tEnv, 'a, 'b)
+      .filter("a % 2 = 0")
+
+    val result = replaceString(tEnv.explain(table))
+
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilterStream0.out").mkString
+    val expect = replaceString(source)
+    assertEquals(result, expect)
+  }
+
+  @Test
+  def testUnion(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table = table1.unionAll(table2)
+
+    val result = replaceString(tEnv.explain(table))
+
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnionStream0.out").mkString
+    val expect = replaceString(source)
+    assertEquals(result, expect)
+  }
+
+  def replaceString(s: String): String = {
+    /* Stage {id} is ignored, because id keeps incrementing in test class
+     * while StreamExecutionEnvironment is up
+     */
+    s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExpressionReductionTest.scala
new file mode 100644
index 0000000..fec25eb
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExpressionReductionTest.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class ExpressionReductionTest extends TableTestBase {
+
+  @Test
+  def testReduceCalcExpression(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .where('a > (1 + 7))
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01.123 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'trueX' AS _c8"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceProjectExpression(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result =  table
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01.123 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'trueX' AS _c8"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceFilterExpression(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .where('a > (1 + 7))
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/FieldProjectionTest.scala
new file mode 100644
index 0000000..3bf9c33
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/FieldProjectionTest.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.{Upper, WindowReference}
+import org.apache.flink.table.plan.logical.TumblingGroupWindow
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{TableTestBase, _}
+import org.junit.Test
+
+/**
+  * Tests for all the situations when we can do fields projection. Like selecting few fields
+  * from a large field count source.
+  */
+class FieldProjectionTest extends TableTestBase {
+
+  val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+  @Test
+  def testSelectFromWindow(): Unit = {
+    val sourceTable = streamUtil
+      .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
+    val resultTable = sourceTable
+        .window(Tumble over 5.millis on 'rowtime as 'w)
+        .groupBy('w)
+        .select(Upper('c).count, 'a.sum)
+
+    val expected =
+      unaryNode(
+        "DataStreamGroupWindowAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "c", "a", "rowtime", "UPPER(c) AS $f3")
+        ),
+        term("window",
+          TumblingGroupWindow(
+            WindowReference("w"),
+            'rowtime,
+            5.millis)),
+        term("select", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1")
+      )
+
+    streamUtil.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testSelectFromGroupedWindow(): Unit = {
+    val sourceTable = streamUtil
+      .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
+    val resultTable = sourceTable
+        .window(Tumble over 5.millis on 'rowtime as 'w)
+        .groupBy('w, 'b)
+        .select(Upper('c).count, 'a.sum, 'b)
+
+    val expected = unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "c", "a", "b", "rowtime", "UPPER(c) AS $f4")
+          ),
+          term("groupBy", "b"),
+          term("window",
+            TumblingGroupWindow(
+              WindowReference("w"),
+              'rowtime,
+              5.millis)),
+          term("select", "b", "COUNT($f4) AS TMP_0", "SUM(a) AS TMP_1")
+        ),
+        term("select", "TMP_0", "TMP_1", "b")
+    )
+
+    streamUtil.verifyTable(resultTable, expected)
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
deleted file mode 100644
index 4a3524b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
-import org.apache.flink.table.api.scala.stream.utils.StreamITCase.RetractingSink
-import org.apache.flink.types.Row
-import org.junit.Assert.assertEquals
-import org.junit.Test
-
-import scala.collection.mutable
-
-/**
-  * Tests of groupby (without window) aggregations
-  */
-class GroupAggregationsITCase extends StreamingWithStateTestBase {
-  private val queryConfig = new StreamQueryConfig()
-  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
-
-
-  @Test
-  def testDistinct(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('b).distinct()
-
-    val results = t.toRetractStream[Row](queryConfig)
-    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
-    env.execute()
-
-    val expected = mutable.MutableList("1", "2", "3", "4", "5", "6")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  @Test
-  def testDistinctAfterAggregate(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .groupBy('e).select('e, 'a.count).distinct()
-
-    val results = t.toRetractStream[Row](queryConfig)
-    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
-    env.execute()
-
-    val expected = mutable.MutableList("1,5", "2,7", "3,3")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  @Test
-  def testNonKeyedGroupAggregate(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-            .select('a.sum, 'b.sum)
-
-    val results = t.toRetractStream[Row](queryConfig)
-    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
-    env.execute()
-
-    val expected = List("231,91")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  @Test
-  def testGroupAggregate(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum)
-
-    val results = t.toRetractStream[Row](queryConfig)
-    results.addSink(new StreamITCase.RetractingSink)
-    env.execute()
-
-    val expected = List("1,1", "2,5", "3,15", "4,34", "5,65", "6,111")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  @Test
-  def testDoubleGroupAggregation(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('a.count as 'cnt, 'b)
-      .groupBy('cnt)
-      .select('cnt, 'b.count as 'freq)
-
-    val results = t.toRetractStream[Row](queryConfig)
-
-    results.addSink(new RetractingSink)
-    env.execute()
-    val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  @Test
-  def testGroupAggregateWithExpression(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .groupBy('e, 'b % 3)
-      .select('c.min, 'e, 'a.avg, 'd.count)
-
-    val results = t.toRetractStream[Row](queryConfig)
-    results.addSink(new RetractingSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "0,1,1,1", "7,1,4,2", "2,1,3,2",
-      "3,2,3,3", "1,2,3,3", "14,2,5,1",
-      "12,3,5,1", "5,3,4,2")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-}