You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/07 12:18:30 UTC

[3/3] flink git commit: [FLINK-6257] [table] Refactor OVER window tests.

[FLINK-6257] [table] Refactor OVER window tests.

This closes #3697.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f2293cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f2293cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f2293cf

Branch: refs/heads/master
Commit: 9f2293cfdab960246fe1aea1d705eea18a011761
Parents: e5b65a7
Author: sunjincheng121 <su...@gmail.com>
Authored: Fri Apr 7 19:28:19 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sun May 7 13:32:26 2017 +0200

----------------------------------------------------------------------
 .../api/scala/stream/sql/OverWindowITCase.scala | 878 ++++++++++++++++
 .../api/scala/stream/sql/OverWindowTest.scala   | 503 ++++++++++
 .../table/api/scala/stream/sql/SqlITCase.scala  | 992 +------------------
 .../scala/stream/sql/WindowAggregateTest.scala  | 380 +------
 .../table/runtime/harness/HarnessTestBase.scala |  87 ++
 .../runtime/harness/OverWindowHarnessTest.scala | 974 ++++++++++++++++++
 6 files changed, 2444 insertions(+), 1370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
new file mode 100644
index 0000000..a7fe1c4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.stream.sql.OverWindowITCase.EventTimeSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+
+import scala.collection.mutable
+
+class OverWindowITCase extends StreamingWithStateTestBase {
+
+  val data = List(
+    (1L, 1, "Hello"),
+    (2L, 2, "Hello"),
+    (3L, 3, "Hello"),
+    (4L, 4, "Hello"),
+    (5L, 5, "Hello"),
+    (6L, 6, "Hello"),
+    (7L, 7, "Hello World"),
+    (8L, 8, "Hello World"),
+    (20L, 20, "Hello World"))
+
+  /**
+    * All aggregates must be computed on the same window.
+    */
+  @Test(expected = classOf[TableException])
+  def testMultiWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+  }
+
+  @Test
+  def testProcTimeBoundedPartitionedRowsOver(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(1)
+    StreamITCase.clear
+
+    val t = StreamTestData.get5TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a, " +
+      "  SUM(c) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC, " +
+      "  MIN(c) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
+      "FROM MyTable"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "1,0,0",
+      "2,1,1",
+      "2,3,1",
+      "3,3,3",
+      "3,7,3",
+      "3,12,3",
+      "4,6,6",
+      "4,13,6",
+      "4,21,6",
+      "4,30,6",
+      "5,10,10",
+      "5,21,10",
+      "5,33,10",
+      "5,46,10",
+      "5,60,10")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeBoundedNonPartitionedRowsOver(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    StreamITCase.clear
+
+    val t = StreamTestData.get5TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a, " +
+      "  SUM(c) OVER (" +
+      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
+      "  MIN(c) OVER (" +
+      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
+      "FROM MyTable"
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "1,0,0",
+      "2,1,0",
+      "2,3,0",
+      "3,6,0",
+      "3,10,0",
+      "3,15,0",
+      "4,21,0",
+      "4,28,0",
+      "4,36,0",
+      "4,45,0",
+      "5,55,0",
+      "5,66,1",
+      "5,77,2",
+      "5,88,3",
+      "5,99,4")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRangeOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    // for sum aggregation ensure that every time the order of each element is consistent
+    env.setParallelism(1)
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
+      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRowsOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) " +
+      " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello World,1", "Hello World,2", "Hello World,3",
+      "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    // for sum aggregation ensure that every time the order of each element is consistent
+    env.setParallelism(1)
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY proctime  RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
+      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRowsOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRangeOver(): Unit = {
+    val data = Seq(
+      Left((1500L, (1L, 15, "Hello"))),
+      Left((1600L, (1L, 16, "Hello"))),
+      Left((1000L, (1L, 1, "Hello"))),
+      Left((2000L, (2L, 2, "Hello"))),
+      Right(1000L),
+      Left((2000L, (2L, 2, "Hello"))),
+      Left((2000L, (2L, 3, "Hello"))),
+      Left((3000L, (3L, 3, "Hello"))),
+      Right(2000L),
+      Left((4000L, (4L, 4, "Hello"))),
+      Right(3000L),
+      Left((5000L, (5L, 5, "Hello"))),
+      Right(5000L),
+      Left((6000L, (6L, 6, "Hello"))),
+      Left((6500L, (6L, 65, "Hello"))),
+      Right(7000L),
+      Left((9000L, (6L, 9, "Hello"))),
+      Left((9500L, (6L, 18, "Hello"))),
+      Left((9000L, (6L, 9, "Hello"))),
+      Right(10000L),
+      Left((10000L, (7L, 7, "Hello World"))),
+      Left((11000L, (7L, 17, "Hello World"))),
+      Left((11000L, (7L, 77, "Hello World"))),
+      Right(12000L),
+      Left((14000L, (7L, 18, "Hello World"))),
+      Right(14000L),
+      Left((15000L, (8L, 8, "Hello World"))),
+      Right(17000L),
+      Left((20000L, (20L, 20, "Hello World"))),
+      Right(19000L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "  c, b, " +
+      "  COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+      "  SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
+      " FROM T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
+      "Hello,3,4,9",
+      "Hello,4,2,7",
+      "Hello,5,2,9",
+      "Hello,6,2,11", "Hello,65,2,12",
+      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
+      "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+      "Hello World,8,2,15",
+      "Hello World,20,1,20")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRowsOver(): Unit = {
+    val data = Seq(
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((3L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Right(2L),
+      Left((3L, (3L, 3, "Hello"))),
+      Left((4L, (4L, 4, "Hello"))),
+      Left((5L, (5L, 5, "Hello"))),
+      Left((6L, (6L, 6, "Hello"))),
+      Left((20L, (20L, 20, "Hello World"))),
+      Right(6L),
+      Left((8L, (8L, 8, "Hello World"))),
+      Left((7L, (7L, 7, "Hello World"))),
+      Right(20L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      " c, a, " +
+      "  COUNT(a) " +
+      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
+      "  SUM(a) " +
+      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+      "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
+      "Hello,6,3,15",
+      "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
+      "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRangeOver(): Unit = {
+    val data = Seq(
+      Left((1500L, (1L, 15, "Hello"))),
+      Left((1600L, (1L, 16, "Hello"))),
+      Left((1000L, (1L, 1, "Hello"))),
+      Left((2000L, (2L, 2, "Hello"))),
+      Right(1000L),
+      Left((2000L, (2L, 2, "Hello"))),
+      Left((2000L, (2L, 3, "Hello"))),
+      Left((3000L, (3L, 3, "Hello"))),
+      Right(2000L),
+      Left((4000L, (4L, 4, "Hello"))),
+      Right(3000L),
+      Left((5000L, (5L, 5, "Hello"))),
+      Right(5000L),
+      Left((6000L, (6L, 6, "Hello"))),
+      Left((6500L, (6L, 65, "Hello"))),
+      Right(7000L),
+      Left((9000L, (6L, 9, "Hello"))),
+      Left((9500L, (6L, 18, "Hello"))),
+      Left((9000L, (6L, 9, "Hello"))),
+      Right(10000L),
+      Left((10000L, (7L, 7, "Hello World"))),
+      Left((11000L, (7L, 17, "Hello World"))),
+      Left((11000L, (7L, 77, "Hello World"))),
+      Right(12000L),
+      Left((14000L, (7L, 18, "Hello World"))),
+      Right(14000L),
+      Left((15000L, (8L, 8, "Hello World"))),
+      Right(17000L),
+      Left((20000L, (20L, 20, "Hello World"))),
+      Right(19000L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "  c, b, " +
+      "  COUNT(a) " +
+      "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+      "  SUM(a) " +
+      "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
+      " FROM T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
+      "Hello,3,4,9",
+      "Hello,4,2,7",
+      "Hello,5,2,9",
+      "Hello,6,2,11", "Hello,65,2,12",
+      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
+      "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+      "Hello World,8,2,15",
+      "Hello World,20,1,20")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRowsOver(): Unit = {
+
+    val data = Seq(
+      Left((2L, (2L, 2, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((20L, (20L, 20, "Hello World"))), // early row
+      Right(3L),
+      Left((2L, (2L, 2, "Hello"))), // late row
+      Left((3L, (3L, 3, "Hello"))),
+      Left((4L, (4L, 4, "Hello"))),
+      Left((5L, (5L, 5, "Hello"))),
+      Left((6L, (6L, 6, "Hello"))),
+      Left((7L, (7L, 7, "Hello World"))),
+      Right(7L),
+      Left((9L, (9L, 9, "Hello World"))),
+      Left((8L, (8L, 8, "Hello World"))),
+      Left((8L, (8L, 8, "Hello World"))),
+      Right(20L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, a, " +
+      "  COUNT(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW), " +
+      "  SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
+      "FROM T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+      "Hello,3,3,7",
+      "Hello,4,3,9", "Hello,5,3,12",
+      "Hello,6,3,15", "Hello World,7,3,18",
+      "Hello World,8,3,21", "Hello World,8,3,23",
+      "Hello World,9,3,25",
+      "Hello World,20,3,37")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "  SUM(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  COUNT(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  AVG(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MAX(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MIN(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L))
+
+    val t1 = env
+      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "1,1,Hello,6,3,2,3,1",
+      "1,2,Hello,6,3,2,3,1",
+      "1,3,Hello world,6,3,2,3,1",
+      "1,1,Hi,7,4,1,3,1",
+      "2,1,Hello,1,1,1,1,1",
+      "2,2,Hello world,6,3,2,3,1",
+      "2,3,Hello world,6,3,2,3,1",
+      "1,4,Hello world,11,5,2,4,1",
+      "1,5,Hello world,29,8,3,7,1",
+      "1,6,Hello world,29,8,3,7,1",
+      "1,7,Hello world,29,8,3,7,1",
+      "2,4,Hello world,15,5,3,5,1",
+      "2,5,Hello world,15,5,3,5,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedPartitionedRowsOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "SUM(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "count(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "avg(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "max(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "min(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row) " +
+      "from T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (3, 1L, "Hello")),
+      Left(14000003L, (1, 2L, "Hello")),
+      Left(14000004L, (1, 3L, "Hello world")),
+      Left(14000007L, (3, 2L, "Hello world")),
+      Left(14000008L, (2, 2L, "Hello world")),
+      Right(14000010L),
+      Left(14000012L, (1, 5L, "Hello world")),
+      Left(14000021L, (1, 6L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000020L),
+      Left(14000024L, (3, 5L, "Hello world")),
+      Left(14000026L, (1, 7L, "Hello world")),
+      Left(14000025L, (1, 8L, "Hello world")),
+      Left(14000022L, (1, 9L, "Hello world")),
+      Right(14000030L))
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+             .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,2,Hello,2,1,2,2,2",
+      "1,3,Hello world,5,2,2,3,2",
+      "1,1,Hi,6,3,2,3,1",
+      "2,1,Hello,1,1,1,1,1",
+      "2,2,Hello world,3,2,1,2,1",
+      "3,1,Hello,1,1,1,1,1",
+      "3,2,Hello world,3,2,1,2,1",
+      "1,5,Hello world,11,4,2,5,1",
+      "1,6,Hello world,17,5,3,6,1",
+      "1,9,Hello world,26,6,4,9,1",
+      "1,8,Hello world,34,7,4,9,1",
+      "1,7,Hello world,41,8,5,9,1",
+      "2,5,Hello world,8,3,2,5,1",
+      "3,5,Hello world,8,3,2,5,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedNonPartitionedRangeOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "  SUM(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  COUNT(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  AVG(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MAX(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MIN(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L))
+
+    val t1 = env
+      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "2,1,Hello,1,1,1,1,1",
+      "1,1,Hello,7,4,1,3,1",
+      "1,2,Hello,7,4,1,3,1",
+      "1,3,Hello world,7,4,1,3,1",
+      "2,2,Hello world,12,6,2,3,1",
+      "2,3,Hello world,12,6,2,3,1",
+      "1,1,Hi,13,7,1,3,1",
+      "1,4,Hello world,17,8,2,4,1",
+      "1,5,Hello world,35,11,3,7,1",
+      "1,6,Hello world,35,11,3,7,1",
+      "1,7,Hello world,35,11,3,7,1",
+      "2,4,Hello world,44,13,3,7,1",
+      "2,5,Hello world,44,13,3,7,1")
+
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedNonPartitionedRowsOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "  SUM(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  COUNT(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  AVG(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MAX(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MIN(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 2L, "Hello")),
+      Left(14000002L, (3, 5L, "Hello")),
+      Left(14000003L, (1, 3L, "Hello")),
+      Left(14000004L, (3, 7L, "Hello world")),
+      Left(14000007L, (4, 9L, "Hello world")),
+      Left(14000008L, (5, 8L, "Hello world")),
+      Right(14000010L),
+      // this element will be discard because it is late
+      Left(14000008L, (6, 8L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (6, 8L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val t1 = env
+      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "2,2,Hello,2,1,2,2,2",
+      "3,5,Hello,7,2,3,5,2",
+      "1,3,Hello,10,3,3,5,2",
+      "3,7,Hello world,17,4,4,7,2",
+      "1,1,Hi,18,5,3,7,1",
+      "4,9,Hello world,27,6,4,9,1",
+      "5,8,Hello world,35,7,5,9,1",
+      "6,8,Hello world,43,8,5,9,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+//  <<<<<<< HEAD
+
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testRowTimeUnBoundedPartitionedRowsOver2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "SUM(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "count(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "avg(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "max(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "min(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row) " +
+      "from T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (3, 1L, "Hello")),
+      Left(14000003L, (1, 2L, "Hello")),
+      Left(14000004L, (1, 3L, "Hello world")),
+      Left(14000007L, (3, 2L, "Hello world")),
+      Left(14000008L, (2, 2L, "Hello world")),
+      Right(14000010L),
+      // the next 3 elements are late
+      Left(14000008L, (1, 4L, "Hello world")),
+      Left(14000008L, (2, 3L, "Hello world")),
+      Left(14000008L, (3, 3L, "Hello world")),
+      Left(14000012L, (1, 5L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 6L, "Hello world")),
+      // the next 3 elements are late
+      Left(14000019L, (1, 6L, "Hello world")),
+      Left(14000018L, (2, 4L, "Hello world")),
+      Left(14000018L, (3, 4L, "Hello world")),
+      Left(14000022L, (2, 5L, "Hello world")),
+      Left(14000022L, (3, 5L, "Hello world")),
+      Left(14000024L, (1, 7L, "Hello world")),
+      Left(14000023L, (1, 8L, "Hello world")),
+      Left(14000021L, (1, 9L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "1,2,Hello,2,1,2,2,2",
+      "1,3,Hello world,5,2,2,3,2",
+      "1,1,Hi,6,3,2,3,1",
+      "2,1,Hello,1,1,1,1,1",
+      "2,2,Hello world,3,2,1,2,1",
+      "3,1,Hello,1,1,1,1,1",
+      "3,2,Hello world,3,2,1,2,1",
+      "1,5,Hello world,11,4,2,5,1",
+      "1,6,Hello world,17,5,3,6,1",
+      "1,9,Hello world,26,6,4,9,1",
+      "1,8,Hello world,34,7,4,9,1",
+      "1,7,Hello world,41,8,5,9,1",
+      "2,5,Hello world,8,3,2,5,1",
+      "3,5,Hello world,8,3,2,5,1"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+}
+
+object OverWindowITCase {
+
+  class EventTimeSourceFunction[T](
+      dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
+    override def run(ctx: SourceContext[T]): Unit = {
+      dataWithTimestampList.foreach {
+        case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
+        case Right(w) => ctx.emitWatermark(new Watermark(w))
+      }
+    }
+
+    override def cancel(): Unit = ???
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
new file mode 100644
index 0000000..711b31b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class OverWindowTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)](
+    "MyTable",
+    'a, 'b, 'c,
+    'proctime.proctime, 'rowtime.rowtime)
+
+  @Test
+  def testProcTimeBoundedPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testProcTimeBoundedPartitionedRangeOver() = {
+
+    val sqlQuery =
+      "SELECT a, " +
+        "  AVG(c) OVER (PARTITION BY a ORDER BY proctime " +
+        "    RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
+        "FROM MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("partitionBy", "a"),
+          term("orderBy", "proctime"),
+          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "proctime",
+            "COUNT(c) AS w0$o0",
+            "$SUM0(c) AS w0$o1"
+          )
+        ),
+        term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testProcTimeBoundedNonPartitionedRangeOver() = {
+
+    val sqlQuery =
+      "SELECT a, " +
+        "  COUNT(c) OVER (ORDER BY proctime " +
+        "    RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " +
+      "FROM MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("orderBy", "proctime"),
+          term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0")
+        ),
+        term("select", "a", "w0$o0 AS $1")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testProcTimeBoundedNonPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+
+  @Test
+  def testProcTimeUnboundedPartitionedRangeOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "proctime"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          streamTableNode(0),
+          term("partitionBy", "c"),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRangeOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("orderBy", "proctime"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          streamTableNode(0),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "rowtime"),
+          term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRangeOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime " +
+      "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "rowtime"),
+          term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("orderBy", "rowtime"),
+          term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRangeOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY rowtime " +
+      "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("orderBy", "rowtime"),
+          term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedPartitionedRangeOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "rowtime"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "rowtime"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "rowtime",
+            "COUNT(a) AS w0$o0",
+            "$SUM0(a) AS w0$o1"
+          )
+        ),
+        term(
+          "select",
+          "c",
+          "w0$o0 AS cnt1",
+          "CASE(>(w0$o0, 0)",
+          "CAST(w0$o1), null) AS cnt2"
+        )
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedNonPartitionedRangeOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("orderBy", "rowtime"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedNonPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("orderBy", "rowtime"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "rowtime",
+            "COUNT(a) AS w0$o0",
+            "$SUM0(a) AS w0$o1"
+          )
+        ),
+        term(
+          "select",
+          "c",
+          "w0$o0 AS cnt1",
+          "CASE(>(w0$o0, 0)",
+          "CAST(w0$o1), null) AS cnt2"
+        )
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 249d505..95366e1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -19,34 +19,16 @@
 package org.apache.flink.table.api.scala.stream.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.stream.sql.SqlITCase.EventTimeSourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-
-import scala.collection.mutable
 
 class SqlITCase extends StreamingWithStateTestBase {
 
-  val data = List(
-    (1L, 1, "Hello"),
-    (2L, 2, "Hello"),
-    (3L, 3, "Hello"),
-    (4L, 4, "Hello"),
-    (5L, 5, "Hello"),
-    (6L, 6, "Hello"),
-    (7L, 7, "Hello World"),
-    (8L, 8, "Hello World"),
-    (20L, 20, "Hello World"))
-
   /** test unbounded groupby (without window) **/
   @Test
   def testUnboundedGroupby(): Unit = {
@@ -208,976 +190,4 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-  @Test
-  def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    // for sum aggregation ensure that every time the order of each element is consistent
-    env.setParallelism(1)
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime  RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
-      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnboundPartitionedProcessingWindowWithRow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
-      "CURRENT ROW)" +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello World,1", "Hello World,2", "Hello World,3",
-      "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnboundNonPartitionedProcessingWindowWithRange(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    // for sum aggregation ensure that every time the order of each element is consistent
-    env.setParallelism(1)
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY proctime  RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
-      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnboundNonPartitionedProcessingWindowWithRow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testBoundPartitionedEventTimeWindowWithRow(): Unit = {
-    val data = Seq(
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((3L, (7L, 7, "Hello World"))),
-      Left((1L, (7L, 7, "Hello World"))),
-      Left((1L, (7L, 7, "Hello World"))),
-      Right(2L),
-      Left((3L, (3L, 3, "Hello"))),
-      Left((4L, (4L, 4, "Hello"))),
-      Left((5L, (5L, 5, "Hello"))),
-      Left((6L, (6L, 6, "Hello"))),
-      Left((20L, (20L, 20, "Hello World"))),
-      Right(6L),
-      Left((8L, (8L, 8, "Hello World"))),
-      Left((7L, (7L, 7, "Hello World"))),
-      Right(20L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, a, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
-      ", sum(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
-      " from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
-      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
-      "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
-      "Hello,6,3,15",
-      "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
-      "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testBoundNonPartitionedEventTimeWindowWithRow(): Unit = {
-
-    val data = Seq(
-      Left((2L, (2L, 2, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((20L, (20L, 20, "Hello World"))), // early row
-      Right(3L),
-      Left((2L, (2L, 2, "Hello"))), // late row
-      Left((3L, (3L, 3, "Hello"))),
-      Left((4L, (4L, 4, "Hello"))),
-      Left((5L, (5L, 5, "Hello"))),
-      Left((6L, (6L, 6, "Hello"))),
-      Left((7L, (7L, 7, "Hello World"))),
-      Right(7L),
-      Left((9L, (9L, 9, "Hello World"))),
-      Left((8L, (8L, 8, "Hello World"))),
-      Left((8L, (8L, 8, "Hello World"))),
-      Right(20L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, a, " +
-      "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)," +
-      "sum(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
-      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
-      "Hello,3,3,7",
-      "Hello,4,3,9", "Hello,5,3,12",
-      "Hello,6,3,15", "Hello World,7,3,18",
-      "Hello World,8,3,21", "Hello World,8,3,23",
-      "Hello World,9,3,25",
-      "Hello World,20,3,37")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
-    val data = Seq(
-      Left((1500L, (1L, 15, "Hello"))),
-      Left((1600L, (1L, 16, "Hello"))),
-      Left((1000L, (1L, 1, "Hello"))),
-      Left((2000L, (2L, 2, "Hello"))),
-      Right(1000L),
-      Left((2000L, (2L, 2, "Hello"))),
-      Left((2000L, (2L, 3, "Hello"))),
-      Left((3000L, (3L, 3, "Hello"))),
-      Right(2000L),
-      Left((4000L, (4L, 4, "Hello"))),
-      Right(3000L),
-      Left((5000L, (5L, 5, "Hello"))),
-      Right(5000L),
-      Left((6000L, (6L, 6, "Hello"))),
-      Left((6500L, (6L, 65, "Hello"))),
-      Right(7000L),
-      Left((9000L, (6L, 9, "Hello"))),
-      Left((9500L, (6L, 18, "Hello"))),
-      Left((9000L, (6L, 9, "Hello"))),
-      Right(10000L),
-      Left((10000L, (7L, 7, "Hello World"))),
-      Left((11000L, (7L, 17, "Hello World"))),
-      Left((11000L, (7L, 77, "Hello World"))),
-      Right(12000L),
-      Left((14000L, (7L, 18, "Hello World"))),
-      Right(14000L),
-      Left((15000L, (8L, 8, "Hello World"))),
-      Right(17000L),
-      Left((20000L, (20L, 20, "Hello World"))),
-      Right(19000L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, b, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
-      "preceding AND CURRENT ROW)" +
-      ", sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
-      " preceding AND CURRENT ROW)" +
-      " from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
-      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
-      "Hello,3,4,9",
-      "Hello,4,2,7",
-      "Hello,5,2,9",
-      "Hello,6,2,11", "Hello,65,2,12",
-      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
-      "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
-      "Hello World,8,2,15",
-      "Hello World,20,1,20")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testBoundNonPartitionedEventTimeWindowWithRange(): Unit = {
-    val data = Seq(
-      Left((1500L, (1L, 15, "Hello"))),
-      Left((1600L, (1L, 16, "Hello"))),
-      Left((1000L, (1L, 1, "Hello"))),
-      Left((2000L, (2L, 2, "Hello"))),
-      Right(1000L),
-      Left((2000L, (2L, 2, "Hello"))),
-      Left((2000L, (2L, 3, "Hello"))),
-      Left((3000L, (3L, 3, "Hello"))),
-      Right(2000L),
-      Left((4000L, (4L, 4, "Hello"))),
-      Right(3000L),
-      Left((5000L, (5L, 5, "Hello"))),
-      Right(5000L),
-      Left((6000L, (6L, 6, "Hello"))),
-      Left((6500L, (6L, 65, "Hello"))),
-      Right(7000L),
-      Left((9000L, (6L, 9, "Hello"))),
-      Left((9500L, (6L, 18, "Hello"))),
-      Left((9000L, (6L, 9, "Hello"))),
-      Right(10000L),
-      Left((10000L, (7L, 7, "Hello World"))),
-      Left((11000L, (7L, 17, "Hello World"))),
-      Left((11000L, (7L, 77, "Hello World"))),
-      Right(12000L),
-      Left((14000L, (7L, 18, "Hello World"))),
-      Right(14000L),
-      Left((15000L, (8L, 8, "Hello World"))),
-      Right(17000L),
-      Left((20000L, (20L, 20, "Hello World"))),
-      Right(19000L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, b, " +
-      "count(a) OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
-      "preceding AND CURRENT ROW)" +
-      ", sum(a) OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
-      " preceding AND CURRENT ROW)" +
-      " from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
-      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
-      "Hello,3,4,9",
-      "Hello,4,2,7",
-      "Hello,5,2,9",
-      "Hello,6,2,11", "Hello,65,2,12",
-      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
-      "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
-      "Hello World,8,2,15",
-      "Hello World,20,1,20")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /**
-    * All aggregates must be computed on the same window.
-    */
-  @Test(expected = classOf[TableException])
-  def testMultiWindow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-  }
-
-  /** test sliding event-time unbounded window with partition by **/
-  @Test
-  def testUnboundedEventTimeRowWindowWithPartition(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "count(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "avg(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "max(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "min(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (3, 1L, "Hello")),
-      Left(14000003L, (1, 2L, "Hello")),
-      Left(14000004L, (1, 3L, "Hello world")),
-      Left(14000007L, (3, 2L, "Hello world")),
-      Left(14000008L, (2, 2L, "Hello world")),
-      Right(14000010L),
-      // the next 3 elements are late
-      Left(14000008L, (1, 4L, "Hello world")),
-      Left(14000008L, (2, 3L, "Hello world")),
-      Left(14000008L, (3, 3L, "Hello world")),
-      Left(14000012L, (1, 5L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 6L, "Hello world")),
-      // the next 3 elements are late
-      Left(14000019L, (1, 6L, "Hello world")),
-      Left(14000018L, (2, 4L, "Hello world")),
-      Left(14000018L, (3, 4L, "Hello world")),
-      Left(14000022L, (2, 5L, "Hello world")),
-      Left(14000022L, (3, 5L, "Hello world")),
-      Left(14000024L, (1, 7L, "Hello world")),
-      Left(14000023L, (1, 8L, "Hello world")),
-      Left(14000021L, (1, 9L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,2,Hello,2,1,2,2,2",
-      "1,3,Hello world,5,2,2,3,2",
-      "1,1,Hi,6,3,2,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,3,2,1,2,1",
-      "3,1,Hello,1,1,1,1,1",
-      "3,2,Hello world,3,2,1,2,1",
-      "1,5,Hello world,11,4,2,5,1",
-      "1,6,Hello world,17,5,3,6,1",
-      "1,9,Hello world,26,6,4,9,1",
-      "1,8,Hello world,34,7,4,9,1",
-      "1,7,Hello world,41,8,5,9,1",
-      "2,5,Hello world,8,3,2,5,1",
-      "3,5,Hello world,8,3,2,5,1"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test sliding event-time unbounded window with partition by **/
-  @Test
-  def testUnboundedEventTimeRowWindowWithPartitionMultiThread(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "count(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "avg(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "max(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "min(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (3, 1L, "Hello")),
-      Left(14000003L, (1, 2L, "Hello")),
-      Left(14000004L, (1, 3L, "Hello world")),
-      Left(14000007L, (3, 2L, "Hello world")),
-      Left(14000008L, (2, 2L, "Hello world")),
-      Right(14000010L),
-      Left(14000012L, (1, 5L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 6L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Left(14000024L, (3, 5L, "Hello world")),
-      Left(14000026L, (1, 7L, "Hello world")),
-      Left(14000025L, (1, 8L, "Hello world")),
-      Left(14000022L, (1, 9L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,2,Hello,2,1,2,2,2",
-      "1,3,Hello world,5,2,2,3,2",
-      "1,1,Hi,6,3,2,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,3,2,1,2,1",
-      "3,1,Hello,1,1,1,1,1",
-      "3,2,Hello world,3,2,1,2,1",
-      "1,5,Hello world,11,4,2,5,1",
-      "1,6,Hello world,17,5,3,6,1",
-      "1,9,Hello world,26,6,4,9,1",
-      "1,8,Hello world,34,7,4,9,1",
-      "1,7,Hello world,41,8,5,9,1",
-      "2,5,Hello world,8,3,2,5,1",
-      "3,5,Hello world,8,3,2,5,1"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test sliding event-time unbounded window without partitiion by **/
-  @Test
-  def testUnboundedEventTimeRowWindowWithoutPartition(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "count(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "avg(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "max(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "min(b) over (order by rowtime rows between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 2L, "Hello")),
-      Left(14000002L, (3, 5L, "Hello")),
-      Left(14000003L, (1, 3L, "Hello")),
-      Left(14000004L, (3, 7L, "Hello world")),
-      Left(14000007L, (4, 9L, "Hello world")),
-      Left(14000008L, (5, 8L, "Hello world")),
-      Right(14000010L),
-      // this element will be discard because it is late
-      Left(14000008L, (6, 8L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (6, 8L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "2,2,Hello,2,1,2,2,2",
-      "3,5,Hello,7,2,3,5,2",
-      "1,3,Hello,10,3,3,5,2",
-      "3,7,Hello world,17,4,4,7,2",
-      "1,1,Hi,18,5,3,7,1",
-      "4,9,Hello world,27,6,4,9,1",
-      "5,8,Hello world,35,7,5,9,1",
-      "6,8,Hello world,43,8,5,9,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test sliding event-time unbounded window without partitiion by and arrive early **/
-  @Test
-  def testUnboundedEventTimeRowWindowArriveEarly(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "count(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "avg(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "max(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "min(b) over (order by rowtime rows between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 2L, "Hello")),
-      Left(14000002L, (3, 5L, "Hello")),
-      Left(14000003L, (1, 3L, "Hello")),
-      // next three elements are early
-      Left(14000012L, (3, 7L, "Hello world")),
-      Left(14000013L, (4, 9L, "Hello world")),
-      Left(14000014L, (5, 8L, "Hello world")),
-      Right(14000010L),
-      Left(14000011L, (6, 8L, "Hello world")),
-      // next element is early
-      Left(14000021L, (6, 8L, "Hello world")),
-      Right(14000020L),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "2,2,Hello,2,1,2,2,2",
-      "3,5,Hello,7,2,3,5,2",
-      "1,3,Hello,10,3,3,5,2",
-      "1,1,Hi,11,4,2,5,1",
-      "6,8,Hello world,19,5,3,8,1",
-      "3,7,Hello world,26,6,4,8,1",
-      "4,9,Hello world,35,7,5,9,1",
-      "5,8,Hello world,43,8,5,9,1",
-      "6,8,Hello world,51,9,5,9,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test sliding event-time non-partitioned unbounded RANGE window **/
-  @Test
-  def testUnboundedNonPartitionedEventTimeRangeWindow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (order by rowtime range between unbounded preceding and current row), " +
-      "count(b) over (order by rowtime range between unbounded preceding and current row), " +
-      "avg(b) over (order by rowtime range between unbounded preceding and current row), " +
-      "max(b) over (order by rowtime range between unbounded preceding and current row), " +
-      "min(b) over (order by rowtime range between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (1, 1L, "Hello")),
-      Left(14000002L, (1, 2L, "Hello")),
-      Left(14000002L, (1, 3L, "Hello world")),
-      Left(14000003L, (2, 2L, "Hello world")),
-      Left(14000003L, (2, 3L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 4L, "Hello world")),
-      Left(14000022L, (1, 5L, "Hello world")),
-      Left(14000022L, (1, 6L, "Hello world")),
-      Left(14000022L, (1, 7L, "Hello world")),
-      Left(14000023L, (2, 4L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "2,1,Hello,1,1,1,1,1",
-      "1,1,Hello,7,4,1,3,1",
-      "1,2,Hello,7,4,1,3,1",
-      "1,3,Hello world,7,4,1,3,1",
-      "2,2,Hello world,12,6,2,3,1",
-      "2,3,Hello world,12,6,2,3,1",
-      "1,1,Hi,13,7,1,3,1",
-      "1,4,Hello world,17,8,2,4,1",
-      "1,5,Hello world,35,11,3,7,1",
-      "1,6,Hello world,35,11,3,7,1",
-      "1,7,Hello world,35,11,3,7,1",
-      "2,4,Hello world,44,13,3,7,1",
-      "2,5,Hello world,44,13,3,7,1"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test sliding event-time unbounded RANGE window **/
-  @Test
-  def testUnboundedPartitionedEventTimeRangeWindow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (" +
-      "partition by a order by rowtime range between unbounded preceding and current row), " +
-      "count(b) over (" +
-      "partition by a order by rowtime range between unbounded preceding and current row), " +
-      "avg(b) over (" +
-      "partition by a order by rowtime range between unbounded preceding and current row), " +
-      "max(b) over (" +
-      "partition by a order by rowtime range between unbounded preceding and current row), " +
-      "min(b) over (" +
-      "partition by a order by rowtime range between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (1, 1L, "Hello")),
-      Left(14000002L, (1, 2L, "Hello")),
-      Left(14000002L, (1, 3L, "Hello world")),
-      Left(14000003L, (2, 2L, "Hello world")),
-      Left(14000003L, (2, 3L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 4L, "Hello world")),
-      Left(14000022L, (1, 5L, "Hello world")),
-      Left(14000022L, (1, 6L, "Hello world")),
-      Left(14000022L, (1, 7L, "Hello world")),
-      Left(14000023L, (2, 4L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,1,Hello,6,3,2,3,1",
-      "1,2,Hello,6,3,2,3,1",
-      "1,3,Hello world,6,3,2,3,1",
-      "1,1,Hi,7,4,1,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,6,3,2,3,1",
-      "2,3,Hello world,6,3,2,3,1",
-      "1,4,Hello world,11,5,2,4,1",
-      "1,5,Hello world,29,8,3,7,1",
-      "1,6,Hello world,29,8,3,7,1",
-      "1,7,Hello world,29,8,3,7,1",
-      "2,4,Hello world,15,5,3,5,1",
-      "2,5,Hello world,15,5,3,5,1"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testPartitionedProcTimeOverWindow(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-    tEnv.registerTable("MyTable", t)
-
-    val sqlQuery = "SELECT a,  " +
-      " SUM(c) OVER (" +
-      " PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
-      " MIN(c) OVER (" +
-      " PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
-      " FROM MyTable"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,0,0",
-      "2,1,1",
-      "2,3,1",
-      "3,3,3",
-      "3,7,3",
-      "3,12,3",
-      "4,6,6",
-      "4,13,6",
-      "4,21,6",
-      "4,24,7",
-      "5,10,10",
-      "5,21,10",
-      "5,33,10",
-      "5,36,11",
-      "5,39,12")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testPartitionedProcTimeOverWindow2(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-    tEnv.registerTable("MyTable", t)
-
-    val sqlQuery = "SELECT a,  " +
-      " SUM(c) OVER (" +
-      " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC , " +
-      " MIN(c) OVER (" +
-      " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
-      " FROM MyTable"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,0,0",
-      "2,1,1",
-      "2,3,1",
-      "3,3,3",
-      "3,7,3",
-      "3,12,3",
-      "4,6,6",
-      "4,13,6",
-      "4,21,6",
-      "4,30,6",
-      "5,10,10",
-      "5,21,10",
-      "5,33,10",
-      "5,46,10",
-      "5,60,10")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-
-  @Test
-  def testNonPartitionedProcTimeOverWindow(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-    tEnv.registerTable("MyTable", t)
-
-    val sqlQuery = "SELECT a,  " +
-      " SUM(c) OVER (" +
-      " ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
-      " MIN(c) OVER (" +
-      " ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
-      " FROM MyTable"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,0,0",
-      "2,1,0",
-      "2,3,0",
-      "3,6,1",
-      "3,9,2",
-      "3,12,3",
-      "4,15,4",
-      "4,18,5",
-      "4,21,6",
-      "4,24,7",
-      "5,27,8",
-      "5,30,9",
-      "5,33,10",
-      "5,36,11",
-      "5,39,12")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testNonPartitionedProcTimeOverWindow2(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-    tEnv.registerTable("MyTable", t)
-
-    val sqlQuery = "SELECT a,  " +
-      " SUM(c) OVER (" +
-      " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
-      " MIN(c) OVER (" +
-      " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
-      " FROM MyTable"
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,0,0",
-      "2,1,0",
-      "2,3,0",
-      "3,6,0",
-      "3,10,0",
-      "3,15,0",
-      "4,21,0",
-      "4,28,0",
-      "4,36,0",
-      "4,45,0",
-      "5,55,0",
-      "5,66,1",
-      "5,77,2",
-      "5,88,3",
-      "5,99,4")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-}
-
-object SqlITCase {
-
-  class EventTimeSourceFunction[T](
-      dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
-    override def run(ctx: SourceContext[T]): Unit = {
-      dataWithTimestampList.foreach {
-        case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
-        case Right(w) => ctx.emitWatermark(new Watermark(w))
-      }
-    }
-
-    override def cancel(): Unit = ???
-  }
 }