You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/07 12:18:28 UTC

[1/3] flink git commit: [FLINK-6033] [table] Add support for SQL UNNEST.

Repository: flink
Updated Branches:
  refs/heads/master e5b65a7fc -> fe4e96a72


[FLINK-6033] [table] Add support for SQL UNNEST.

This closes #3793.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe4e96a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe4e96a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe4e96a7

Branch: refs/heads/master
Commit: fe4e96a726dd32fb948db050b975312e120e2461
Parents: 9f2293c
Author: Shuyi Chen <sh...@uber.com>
Authored: Fri Apr 21 23:48:28 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sun May 7 13:32:26 2017 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |   2 +
 .../flink/table/calcite/FlinkTypeFactory.scala  |   5 +-
 .../utils/UserDefinedFunctionUtils.scala        |   6 +-
 .../flink/table/plan/nodes/FlinkRelNode.scala   |   3 +-
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   3 +
 .../plan/rules/logical/LogicalUnnestRule.scala  | 134 +++++++++++++++++++
 .../table/plan/util/ExplodeFunctionUtil.scala   |  91 +++++++++++++
 .../flink/table/typeutils/TypeCheckUtils.scala  |   4 +-
 .../table/api/scala/batch/sql/JoinITCase.scala  |  54 ++++++++
 .../table/api/scala/stream/sql/SqlITCase.scala  |  90 +++++++++++++
 10 files changed, 388 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index a77d994..d105188 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1482,6 +1482,7 @@ val result2 = tableEnv.sql(
 #### Limitations
 
 Joins, set operations, and non-windowed aggregations are not supported yet.
+`UNNEST` supports only arrays and does not support `WITH ORDINALITY` yet.
 
 {% top %}
 
@@ -1690,6 +1691,7 @@ tableReference:
 tablePrimary:
   [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
   | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
+  | UNNEST '(' expression ')'
 
 values:
   VALUES expression [, expression ]*

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 9281ad8..eba1623 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo._
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.ValueTypeInfo._
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
@@ -180,6 +180,9 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
     case pa: PrimitiveArrayTypeInfo[_] =>
       new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false)
 
+    case ba: BasicArrayTypeInfo[_, _] =>
+      new ArrayRelDataType(ba, createTypeFromTypeInfo(ba.getComponentInfo), true)
+
     case oa: ObjectArrayTypeInfo[_, _] =>
       new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 689bf0e..11174de 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -436,7 +436,11 @@ object UserDefinedFunctionUtils {
     expected.isPrimitive && Primitives.wrap(expected) == candidate ||
     candidate == classOf[Date] && (expected == classOf[Int] || expected == classOf[JInt])  ||
     candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) ||
-    candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong])
+    candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong]) ||
+    (candidate.isArray &&
+      expected.isArray &&
+      candidate.getComponentType.isInstanceOf[Object] &&
+      expected.getComponentType == classOf[Object])
 
   @throws[Exception]
   def serialize(function: UserDefinedFunction): String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
index 0b244e9..8509a8e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
@@ -61,7 +61,8 @@ trait FlinkRelNode extends RelNode {
         val referenceExpr = getExpressionString(fa.getReferenceExpr, inFields, localExprsTable)
         val field = fa.getField.getName
         s"$referenceExpr.$field"
-
+      case cv: RexCorrelVariable =>
+        cv.toString
       case _ =>
         throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index fad60fd..980dfd3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -100,6 +100,9 @@ object FlinkRuleSets {
     PushProjectIntoTableSourceScanRule.INSTANCE,
     PushFilterIntoTableSourceScanRule.INSTANCE,
 
+    // Unnest rule
+    LogicalUnnestRule.INSTANCE,
+
     // translate to flink logical rel nodes
     FlinkLogicalAggregate.CONVERTER,
     FlinkLogicalWindowAggregate.CONVERTER,

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
new file mode 100644
index 0000000..f2d9f2a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import java.util.Collections
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.`type`.{RelDataTypeFieldImpl, RelRecordType, StructKind}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.Uncollect
+import org.apache.calcite.rel.logical._
+import org.apache.calcite.sql.`type`.AbstractSqlType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+import org.apache.flink.table.plan.schema.ArrayRelDataType
+import org.apache.flink.table.plan.util.ExplodeFunctionUtil
+
+class LogicalUnnestRule(
+    operand: RelOptRuleOperand,
+    description: String)
+  extends RelOptRule(operand, description) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+
+    val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
+    val right = join.getRight.asInstanceOf[RelSubset].getOriginal
+
+    right match {
+      // a filter is pushed above the table function
+      case filter: LogicalFilter =>
+        filter.getInput.asInstanceOf[RelSubset].getOriginal match {
+          case u: Uncollect => !u.withOrdinality
+          case _ => false
+        }
+      case u: Uncollect => !u.withOrdinality
+      case _ => false
+    }
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val correlate = call.rel(0).asInstanceOf[LogicalCorrelate]
+
+    val outer = correlate.getLeft.asInstanceOf[RelSubset].getOriginal
+    val array = correlate.getRight.asInstanceOf[RelSubset].getOriginal
+
+    def convert(relNode: RelNode): RelNode = {
+      relNode match {
+        case rs: RelSubset =>
+          convert(rs.getRelList.get(0))
+
+        case f: LogicalFilter =>
+          f.copy(
+            f.getTraitSet,
+            ImmutableList.of(convert(f.getInput.asInstanceOf[RelSubset].getOriginal)))
+
+        case uc: Uncollect =>
+          // convert Uncollect into TableFunctionScan
+          val cluster = correlate.getCluster
+
+          val arrayType =
+            uc.getInput.getRowType.getFieldList.get(0).getValue.asInstanceOf[ArrayRelDataType]
+          val componentType = arrayType.getComponentType
+
+          // create table function
+          val explodeTableFunc = UserDefinedFunctionUtils.createTableSqlFunctions(
+            "explode",
+            ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo),
+            FlinkTypeFactory.toTypeInfo(arrayType.getComponentType),
+            cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]).head
+
+          // create table function call
+          val rexCall = cluster.getRexBuilder.makeCall(
+            explodeTableFunc,
+            uc.getInput.asInstanceOf[RelSubset]
+              .getOriginal.asInstanceOf[LogicalProject].getChildExps
+          )
+
+          // determine rel data type of unnest
+          val rowType = componentType match {
+            case _: AbstractSqlType =>
+              new RelRecordType(
+                StructKind.FULLY_QUALIFIED,
+                ImmutableList.of(new RelDataTypeFieldImpl("f0", 0, componentType)))
+            case _: RelRecordType => componentType
+            case _ => throw TableException(
+              s"Unsupported array component type in UNNEST: ${componentType.toString}")
+          }
+
+          // create table function scan
+          new LogicalTableFunctionScan(
+            cluster,
+            correlate.getTraitSet,
+            Collections.emptyList(),
+            rexCall,
+            classOf[Array[Object]],
+            rowType,
+            null)
+      }
+    }
+
+    // convert unnest into table function scan
+    val tableFunctionScan = convert(array)
+    // create correlate with table function scan as input
+    val newCorrleate =
+      correlate.copy(correlate.getTraitSet, ImmutableList.of(outer, tableFunctionScan))
+    call.transformTo(newCorrleate)
+  }
+}
+
+object LogicalUnnestRule {
+  val INSTANCE = new LogicalUnnestRule(
+    operand(classOf[LogicalCorrelate], any),
+    "LogicalUnnestRule")
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
new file mode 100644
index 0000000..1bcc6d9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.table.functions.TableFunction
+
+class ObjectExplodeTableFunc extends TableFunction[Object] {
+  def eval(arr: Array[Object]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+class FloatExplodeTableFunc extends TableFunction[Float] {
+  def eval(arr: Array[Float]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+class ShortExplodeTableFunc extends TableFunction[Short] {
+  def eval(arr: Array[Short]): Unit = {
+    arr.foreach(collect)
+  }
+}
+class IntExplodeTableFunc extends TableFunction[Int] {
+  def eval(arr: Array[Int]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+class LongExplodeTableFunc extends TableFunction[Long] {
+  def eval(arr: Array[Long]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+class DoubleExplodeTableFunc extends TableFunction[Double] {
+  def eval(arr: Array[Double]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+class ByteExplodeTableFunc extends TableFunction[Byte] {
+  def eval(arr: Array[Byte]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+class BooleanExplodeTableFunc extends TableFunction[Boolean] {
+  def eval(arr: Array[Boolean]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+object ExplodeFunctionUtil {
+  def explodeTableFuncFromType(ti: TypeInformation[_]):TableFunction[_] = {
+    ti match {
+      case pat: PrimitiveArrayTypeInfo[_] => {
+        pat.getComponentType match {
+          case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc
+          case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc
+          case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc
+          case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc
+          case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc
+          case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc
+          case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc
+        }
+      }
+      case _: ObjectArrayTypeInfo[_, _] => new ObjectExplodeTableFunc
+      case _: BasicArrayTypeInfo[_, _] => new ObjectExplodeTableFunc
+      case _ => throw new UnsupportedOperationException(ti.toString + "IS NOT supported")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index 9896a8c..fea8c2a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -68,7 +68,9 @@ object TypeCheckUtils {
   def isLong(dataType: TypeInformation[_]): Boolean = dataType == LONG_TYPE_INFO
 
   def isArray(dataType: TypeInformation[_]): Boolean = dataType match {
-    case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => true
+    case _: ObjectArrayTypeInfo[_, _] |
+         _: PrimitiveArrayTypeInfo[_] |
+         _: BasicArrayTypeInfo[_, _] => true
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
index 9df17ad..8a8c0ce 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -375,6 +376,59 @@ class JoinITCase(
     Assert.assertEquals(0, result)
   }
 
+  @Test
+  def testCrossWithUnnest(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val data = List(
+      (1, 1L, Array("Hi", "w")),
+      (2, 2L, Array("Hello", "k")),
+      (3, 2L, Array("Hello world", "x"))
+    )
+    val stream = env.fromCollection(data)
+    tEnv.registerDataSet("T", stream, 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) as A (s)"
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = List("1,Hi", "1,w", "2,Hello", "2,k", "3,Hello world", "3,x")
+    val results = result.toDataSet[Row].collect().toList
+    assertEquals(expected.toString(), results.sortWith(_.toString < _.toString).toString())
+  }
+
+  @Test
+  def testJoinWithUnnestOfTuple(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val data = List(
+      (1, Array((12, "45.6"), (2, "45.612"))),
+      (2, Array((13, "41.6"), (1, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val stream = env.fromCollection(data)
+    tEnv.registerDataSet("T", stream, 'a, 'b)
+
+    val sqlQuery = "" +
+      "SELECT a, b, x, y " +
+      "FROM " +
+      "  (SELECT a, b FROM T WHERE a < 3) as tf, " +
+      "  UNNEST(tf.b) as A (x, y) " +
+      "WHERE x > a"
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = List(
+      "1,[(12,45.6), (2,45.612)],12,45.6",
+      "1,[(12,45.6), (2,45.612)],2,45.612",
+      "2,[(13,41.6), (1,45.2136)],13,41.6").mkString(", ")
+    val results = result.toDataSet[Row].collect().map(_.toString)
+    assertEquals(expected, results.sorted.mkString(", "))
+  }
+
   @Test(expected = classOf[TableException])
   def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 95366e1..4147358 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -190,4 +190,94 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testUnnestPrimitiveArrayFromTable(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = List(
+      (1, Array(12, 45), Array(Array(12, 45))),
+      (2, Array(41, 5), Array(Array(18), Array(87))),
+      (3, Array(18, 42), Array(Array(1), Array(45)))
+    )
+    val stream = env.fromCollection(data)
+    tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "1,[12, 45],12",
+      "1,[12, 45],45",
+      "2,[41, 5],41",
+      "2,[41, 5],5",
+      "3,[18, 42],18",
+      "3,[18, 42],42"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUnnestArrayOfArrayFromTable(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = List(
+      (1, Array(12, 45), Array(Array(12, 45))),
+      (2, Array(41, 5), Array(Array(18), Array(87))),
+      (3, Array(18, 42), Array(Array(1), Array(45)))
+    )
+    val stream = env.fromCollection(data)
+    tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "1,[12, 45]",
+      "2,[18]",
+      "2,[87]",
+      "3,[1]",
+      "3,[45]")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUnnestObjectArrayFromTableWithFilter(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = List(
+      (1, Array((12, "45.6"), (12, "45.612"))),
+      (2, Array((13, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val stream = env.fromCollection(data)
+    tEnv.registerDataStream("T", stream, 'a, 'b)
+
+    val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "2,[(13,41.6), (14,45.2136)],14,45.2136",
+      "3,[(18,42.6)],18,42.6")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
 }
+


[3/3] flink git commit: [FLINK-6257] [table] Refactor OVER window tests.

Posted by fh...@apache.org.
[FLINK-6257] [table] Refactor OVER window tests.

This closes #3697.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f2293cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f2293cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f2293cf

Branch: refs/heads/master
Commit: 9f2293cfdab960246fe1aea1d705eea18a011761
Parents: e5b65a7
Author: sunjincheng121 <su...@gmail.com>
Authored: Fri Apr 7 19:28:19 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sun May 7 13:32:26 2017 +0200

----------------------------------------------------------------------
 .../api/scala/stream/sql/OverWindowITCase.scala | 878 ++++++++++++++++
 .../api/scala/stream/sql/OverWindowTest.scala   | 503 ++++++++++
 .../table/api/scala/stream/sql/SqlITCase.scala  | 992 +------------------
 .../scala/stream/sql/WindowAggregateTest.scala  | 380 +------
 .../table/runtime/harness/HarnessTestBase.scala |  87 ++
 .../runtime/harness/OverWindowHarnessTest.scala | 974 ++++++++++++++++++
 6 files changed, 2444 insertions(+), 1370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
new file mode 100644
index 0000000..a7fe1c4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.stream.sql.OverWindowITCase.EventTimeSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+
+import scala.collection.mutable
+
+class OverWindowITCase extends StreamingWithStateTestBase {
+
+  val data = List(
+    (1L, 1, "Hello"),
+    (2L, 2, "Hello"),
+    (3L, 3, "Hello"),
+    (4L, 4, "Hello"),
+    (5L, 5, "Hello"),
+    (6L, 6, "Hello"),
+    (7L, 7, "Hello World"),
+    (8L, 8, "Hello World"),
+    (20L, 20, "Hello World"))
+
+  /**
+    * All aggregates must be computed on the same window.
+    */
+  @Test(expected = classOf[TableException])
+  def testMultiWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+  }
+
+  @Test
+  def testProcTimeBoundedPartitionedRowsOver(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(1)
+    StreamITCase.clear
+
+    val t = StreamTestData.get5TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a, " +
+      "  SUM(c) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC, " +
+      "  MIN(c) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
+      "FROM MyTable"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "1,0,0",
+      "2,1,1",
+      "2,3,1",
+      "3,3,3",
+      "3,7,3",
+      "3,12,3",
+      "4,6,6",
+      "4,13,6",
+      "4,21,6",
+      "4,30,6",
+      "5,10,10",
+      "5,21,10",
+      "5,33,10",
+      "5,46,10",
+      "5,60,10")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeBoundedNonPartitionedRowsOver(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    StreamITCase.clear
+
+    val t = StreamTestData.get5TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a, " +
+      "  SUM(c) OVER (" +
+      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
+      "  MIN(c) OVER (" +
+      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
+      "FROM MyTable"
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "1,0,0",
+      "2,1,0",
+      "2,3,0",
+      "3,6,0",
+      "3,10,0",
+      "3,15,0",
+      "4,21,0",
+      "4,28,0",
+      "4,36,0",
+      "4,45,0",
+      "5,55,0",
+      "5,66,1",
+      "5,77,2",
+      "5,88,3",
+      "5,99,4")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRangeOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    // for sum aggregation ensure that every time the order of each element is consistent
+    env.setParallelism(1)
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
+      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRowsOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) " +
+      " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello World,1", "Hello World,2", "Hello World,3",
+      "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    // for sum aggregation ensure that every time the order of each element is consistent
+    env.setParallelism(1)
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY proctime  RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
+      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRowsOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRangeOver(): Unit = {
+    val data = Seq(
+      Left((1500L, (1L, 15, "Hello"))),
+      Left((1600L, (1L, 16, "Hello"))),
+      Left((1000L, (1L, 1, "Hello"))),
+      Left((2000L, (2L, 2, "Hello"))),
+      Right(1000L),
+      Left((2000L, (2L, 2, "Hello"))),
+      Left((2000L, (2L, 3, "Hello"))),
+      Left((3000L, (3L, 3, "Hello"))),
+      Right(2000L),
+      Left((4000L, (4L, 4, "Hello"))),
+      Right(3000L),
+      Left((5000L, (5L, 5, "Hello"))),
+      Right(5000L),
+      Left((6000L, (6L, 6, "Hello"))),
+      Left((6500L, (6L, 65, "Hello"))),
+      Right(7000L),
+      Left((9000L, (6L, 9, "Hello"))),
+      Left((9500L, (6L, 18, "Hello"))),
+      Left((9000L, (6L, 9, "Hello"))),
+      Right(10000L),
+      Left((10000L, (7L, 7, "Hello World"))),
+      Left((11000L, (7L, 17, "Hello World"))),
+      Left((11000L, (7L, 77, "Hello World"))),
+      Right(12000L),
+      Left((14000L, (7L, 18, "Hello World"))),
+      Right(14000L),
+      Left((15000L, (8L, 8, "Hello World"))),
+      Right(17000L),
+      Left((20000L, (20L, 20, "Hello World"))),
+      Right(19000L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "  c, b, " +
+      "  COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+      "  SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
+      " FROM T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
+      "Hello,3,4,9",
+      "Hello,4,2,7",
+      "Hello,5,2,9",
+      "Hello,6,2,11", "Hello,65,2,12",
+      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
+      "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+      "Hello World,8,2,15",
+      "Hello World,20,1,20")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRowsOver(): Unit = {
+    val data = Seq(
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((3L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Right(2L),
+      Left((3L, (3L, 3, "Hello"))),
+      Left((4L, (4L, 4, "Hello"))),
+      Left((5L, (5L, 5, "Hello"))),
+      Left((6L, (6L, 6, "Hello"))),
+      Left((20L, (20L, 20, "Hello World"))),
+      Right(6L),
+      Left((8L, (8L, 8, "Hello World"))),
+      Left((7L, (7L, 7, "Hello World"))),
+      Right(20L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      " c, a, " +
+      "  COUNT(a) " +
+      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
+      "  SUM(a) " +
+      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+      "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
+      "Hello,6,3,15",
+      "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
+      "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRangeOver(): Unit = {
+    val data = Seq(
+      Left((1500L, (1L, 15, "Hello"))),
+      Left((1600L, (1L, 16, "Hello"))),
+      Left((1000L, (1L, 1, "Hello"))),
+      Left((2000L, (2L, 2, "Hello"))),
+      Right(1000L),
+      Left((2000L, (2L, 2, "Hello"))),
+      Left((2000L, (2L, 3, "Hello"))),
+      Left((3000L, (3L, 3, "Hello"))),
+      Right(2000L),
+      Left((4000L, (4L, 4, "Hello"))),
+      Right(3000L),
+      Left((5000L, (5L, 5, "Hello"))),
+      Right(5000L),
+      Left((6000L, (6L, 6, "Hello"))),
+      Left((6500L, (6L, 65, "Hello"))),
+      Right(7000L),
+      Left((9000L, (6L, 9, "Hello"))),
+      Left((9500L, (6L, 18, "Hello"))),
+      Left((9000L, (6L, 9, "Hello"))),
+      Right(10000L),
+      Left((10000L, (7L, 7, "Hello World"))),
+      Left((11000L, (7L, 17, "Hello World"))),
+      Left((11000L, (7L, 77, "Hello World"))),
+      Right(12000L),
+      Left((14000L, (7L, 18, "Hello World"))),
+      Right(14000L),
+      Left((15000L, (8L, 8, "Hello World"))),
+      Right(17000L),
+      Left((20000L, (20L, 20, "Hello World"))),
+      Right(19000L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "  c, b, " +
+      "  COUNT(a) " +
+      "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+      "  SUM(a) " +
+      "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
+      " FROM T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
+      "Hello,3,4,9",
+      "Hello,4,2,7",
+      "Hello,5,2,9",
+      "Hello,6,2,11", "Hello,65,2,12",
+      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
+      "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+      "Hello World,8,2,15",
+      "Hello World,20,1,20")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRowsOver(): Unit = {
+
+    val data = Seq(
+      Left((2L, (2L, 2, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((20L, (20L, 20, "Hello World"))), // early row
+      Right(3L),
+      Left((2L, (2L, 2, "Hello"))), // late row
+      Left((3L, (3L, 3, "Hello"))),
+      Left((4L, (4L, 4, "Hello"))),
+      Left((5L, (5L, 5, "Hello"))),
+      Left((6L, (6L, 6, "Hello"))),
+      Left((7L, (7L, 7, "Hello World"))),
+      Right(7L),
+      Left((9L, (9L, 9, "Hello World"))),
+      Left((8L, (8L, 8, "Hello World"))),
+      Left((8L, (8L, 8, "Hello World"))),
+      Right(20L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, a, " +
+      "  COUNT(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW), " +
+      "  SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
+      "FROM T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+      "Hello,3,3,7",
+      "Hello,4,3,9", "Hello,5,3,12",
+      "Hello,6,3,15", "Hello World,7,3,18",
+      "Hello World,8,3,21", "Hello World,8,3,23",
+      "Hello World,9,3,25",
+      "Hello World,20,3,37")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "  SUM(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  COUNT(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  AVG(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MAX(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MIN(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L))
+
+    val t1 = env
+      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "1,1,Hello,6,3,2,3,1",
+      "1,2,Hello,6,3,2,3,1",
+      "1,3,Hello world,6,3,2,3,1",
+      "1,1,Hi,7,4,1,3,1",
+      "2,1,Hello,1,1,1,1,1",
+      "2,2,Hello world,6,3,2,3,1",
+      "2,3,Hello world,6,3,2,3,1",
+      "1,4,Hello world,11,5,2,4,1",
+      "1,5,Hello world,29,8,3,7,1",
+      "1,6,Hello world,29,8,3,7,1",
+      "1,7,Hello world,29,8,3,7,1",
+      "2,4,Hello world,15,5,3,5,1",
+      "2,5,Hello world,15,5,3,5,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedPartitionedRowsOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "SUM(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "count(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "avg(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "max(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "min(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row) " +
+      "from T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (3, 1L, "Hello")),
+      Left(14000003L, (1, 2L, "Hello")),
+      Left(14000004L, (1, 3L, "Hello world")),
+      Left(14000007L, (3, 2L, "Hello world")),
+      Left(14000008L, (2, 2L, "Hello world")),
+      Right(14000010L),
+      Left(14000012L, (1, 5L, "Hello world")),
+      Left(14000021L, (1, 6L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000020L),
+      Left(14000024L, (3, 5L, "Hello world")),
+      Left(14000026L, (1, 7L, "Hello world")),
+      Left(14000025L, (1, 8L, "Hello world")),
+      Left(14000022L, (1, 9L, "Hello world")),
+      Right(14000030L))
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+             .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,2,Hello,2,1,2,2,2",
+      "1,3,Hello world,5,2,2,3,2",
+      "1,1,Hi,6,3,2,3,1",
+      "2,1,Hello,1,1,1,1,1",
+      "2,2,Hello world,3,2,1,2,1",
+      "3,1,Hello,1,1,1,1,1",
+      "3,2,Hello world,3,2,1,2,1",
+      "1,5,Hello world,11,4,2,5,1",
+      "1,6,Hello world,17,5,3,6,1",
+      "1,9,Hello world,26,6,4,9,1",
+      "1,8,Hello world,34,7,4,9,1",
+      "1,7,Hello world,41,8,5,9,1",
+      "2,5,Hello world,8,3,2,5,1",
+      "3,5,Hello world,8,3,2,5,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedNonPartitionedRangeOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "  SUM(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  COUNT(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  AVG(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MAX(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MIN(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L))
+
+    val t1 = env
+      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "2,1,Hello,1,1,1,1,1",
+      "1,1,Hello,7,4,1,3,1",
+      "1,2,Hello,7,4,1,3,1",
+      "1,3,Hello world,7,4,1,3,1",
+      "2,2,Hello world,12,6,2,3,1",
+      "2,3,Hello world,12,6,2,3,1",
+      "1,1,Hi,13,7,1,3,1",
+      "1,4,Hello world,17,8,2,4,1",
+      "1,5,Hello world,35,11,3,7,1",
+      "1,6,Hello world,35,11,3,7,1",
+      "1,7,Hello world,35,11,3,7,1",
+      "2,4,Hello world,44,13,3,7,1",
+      "2,5,Hello world,44,13,3,7,1")
+
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedNonPartitionedRowsOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "  SUM(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  COUNT(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  AVG(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MAX(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MIN(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 2L, "Hello")),
+      Left(14000002L, (3, 5L, "Hello")),
+      Left(14000003L, (1, 3L, "Hello")),
+      Left(14000004L, (3, 7L, "Hello world")),
+      Left(14000007L, (4, 9L, "Hello world")),
+      Left(14000008L, (5, 8L, "Hello world")),
+      Right(14000010L),
+      // this element will be discard because it is late
+      Left(14000008L, (6, 8L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (6, 8L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val t1 = env
+      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "2,2,Hello,2,1,2,2,2",
+      "3,5,Hello,7,2,3,5,2",
+      "1,3,Hello,10,3,3,5,2",
+      "3,7,Hello world,17,4,4,7,2",
+      "1,1,Hi,18,5,3,7,1",
+      "4,9,Hello world,27,6,4,9,1",
+      "5,8,Hello world,35,7,5,9,1",
+      "6,8,Hello world,43,8,5,9,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+//  <<<<<<< HEAD
+
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testRowTimeUnBoundedPartitionedRowsOver2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "SUM(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "count(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "avg(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "max(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "min(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row) " +
+      "from T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (3, 1L, "Hello")),
+      Left(14000003L, (1, 2L, "Hello")),
+      Left(14000004L, (1, 3L, "Hello world")),
+      Left(14000007L, (3, 2L, "Hello world")),
+      Left(14000008L, (2, 2L, "Hello world")),
+      Right(14000010L),
+      // the next 3 elements are late
+      Left(14000008L, (1, 4L, "Hello world")),
+      Left(14000008L, (2, 3L, "Hello world")),
+      Left(14000008L, (3, 3L, "Hello world")),
+      Left(14000012L, (1, 5L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 6L, "Hello world")),
+      // the next 3 elements are late
+      Left(14000019L, (1, 6L, "Hello world")),
+      Left(14000018L, (2, 4L, "Hello world")),
+      Left(14000018L, (3, 4L, "Hello world")),
+      Left(14000022L, (2, 5L, "Hello world")),
+      Left(14000022L, (3, 5L, "Hello world")),
+      Left(14000024L, (1, 7L, "Hello world")),
+      Left(14000023L, (1, 8L, "Hello world")),
+      Left(14000021L, (1, 9L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "1,2,Hello,2,1,2,2,2",
+      "1,3,Hello world,5,2,2,3,2",
+      "1,1,Hi,6,3,2,3,1",
+      "2,1,Hello,1,1,1,1,1",
+      "2,2,Hello world,3,2,1,2,1",
+      "3,1,Hello,1,1,1,1,1",
+      "3,2,Hello world,3,2,1,2,1",
+      "1,5,Hello world,11,4,2,5,1",
+      "1,6,Hello world,17,5,3,6,1",
+      "1,9,Hello world,26,6,4,9,1",
+      "1,8,Hello world,34,7,4,9,1",
+      "1,7,Hello world,41,8,5,9,1",
+      "2,5,Hello world,8,3,2,5,1",
+      "3,5,Hello world,8,3,2,5,1"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+}
+
+object OverWindowITCase {
+
+  class EventTimeSourceFunction[T](
+      dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
+    override def run(ctx: SourceContext[T]): Unit = {
+      dataWithTimestampList.foreach {
+        case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
+        case Right(w) => ctx.emitWatermark(new Watermark(w))
+      }
+    }
+
+    override def cancel(): Unit = ???
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
new file mode 100644
index 0000000..711b31b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class OverWindowTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)](
+    "MyTable",
+    'a, 'b, 'c,
+    'proctime.proctime, 'rowtime.rowtime)
+
+  @Test
+  def testProcTimeBoundedPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testProcTimeBoundedPartitionedRangeOver() = {
+
+    val sqlQuery =
+      "SELECT a, " +
+        "  AVG(c) OVER (PARTITION BY a ORDER BY proctime " +
+        "    RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
+        "FROM MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("partitionBy", "a"),
+          term("orderBy", "proctime"),
+          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "proctime",
+            "COUNT(c) AS w0$o0",
+            "$SUM0(c) AS w0$o1"
+          )
+        ),
+        term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testProcTimeBoundedNonPartitionedRangeOver() = {
+
+    val sqlQuery =
+      "SELECT a, " +
+        "  COUNT(c) OVER (ORDER BY proctime " +
+        "    RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " +
+      "FROM MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("orderBy", "proctime"),
+          term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0")
+        ),
+        term("select", "a", "w0$o0 AS $1")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testProcTimeBoundedNonPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+
+  @Test
+  def testProcTimeUnboundedPartitionedRangeOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "proctime"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          streamTableNode(0),
+          term("partitionBy", "c"),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRangeOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("orderBy", "proctime"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          streamTableNode(0),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "rowtime"),
+          term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRangeOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime " +
+      "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "rowtime"),
+          term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("orderBy", "rowtime"),
+          term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRangeOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY rowtime " +
+      "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("orderBy", "rowtime"),
+          term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedPartitionedRangeOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "rowtime"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "rowtime"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "rowtime",
+            "COUNT(a) AS w0$o0",
+            "$SUM0(a) AS w0$o1"
+          )
+        ),
+        term(
+          "select",
+          "c",
+          "w0$o0 AS cnt1",
+          "CASE(>(w0$o0, 0)",
+          "CAST(w0$o1), null) AS cnt2"
+        )
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedNonPartitionedRangeOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("orderBy", "rowtime"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedNonPartitionedRowsOver() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("orderBy", "rowtime"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "rowtime",
+            "COUNT(a) AS w0$o0",
+            "$SUM0(a) AS w0$o1"
+          )
+        ),
+        term(
+          "select",
+          "c",
+          "w0$o0 AS cnt1",
+          "CASE(>(w0$o0, 0)",
+          "CAST(w0$o1), null) AS cnt2"
+        )
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 249d505..95366e1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -19,34 +19,16 @@
 package org.apache.flink.table.api.scala.stream.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.stream.sql.SqlITCase.EventTimeSourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-
-import scala.collection.mutable
 
 class SqlITCase extends StreamingWithStateTestBase {
 
-  val data = List(
-    (1L, 1, "Hello"),
-    (2L, 2, "Hello"),
-    (3L, 3, "Hello"),
-    (4L, 4, "Hello"),
-    (5L, 5, "Hello"),
-    (6L, 6, "Hello"),
-    (7L, 7, "Hello World"),
-    (8L, 8, "Hello World"),
-    (20L, 20, "Hello World"))
-
   /** test unbounded groupby (without window) **/
   @Test
   def testUnboundedGroupby(): Unit = {
@@ -208,976 +190,4 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-  @Test
-  def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    // for sum aggregation ensure that every time the order of each element is consistent
-    env.setParallelism(1)
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime  RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
-      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnboundPartitionedProcessingWindowWithRow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
-      "CURRENT ROW)" +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello World,1", "Hello World,2", "Hello World,3",
-      "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnboundNonPartitionedProcessingWindowWithRange(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    // for sum aggregation ensure that every time the order of each element is consistent
-    env.setParallelism(1)
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY proctime  RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
-      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnboundNonPartitionedProcessingWindowWithRow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testBoundPartitionedEventTimeWindowWithRow(): Unit = {
-    val data = Seq(
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((3L, (7L, 7, "Hello World"))),
-      Left((1L, (7L, 7, "Hello World"))),
-      Left((1L, (7L, 7, "Hello World"))),
-      Right(2L),
-      Left((3L, (3L, 3, "Hello"))),
-      Left((4L, (4L, 4, "Hello"))),
-      Left((5L, (5L, 5, "Hello"))),
-      Left((6L, (6L, 6, "Hello"))),
-      Left((20L, (20L, 20, "Hello World"))),
-      Right(6L),
-      Left((8L, (8L, 8, "Hello World"))),
-      Left((7L, (7L, 7, "Hello World"))),
-      Right(20L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, a, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
-      ", sum(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
-      " from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
-      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
-      "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
-      "Hello,6,3,15",
-      "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
-      "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testBoundNonPartitionedEventTimeWindowWithRow(): Unit = {
-
-    val data = Seq(
-      Left((2L, (2L, 2, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((20L, (20L, 20, "Hello World"))), // early row
-      Right(3L),
-      Left((2L, (2L, 2, "Hello"))), // late row
-      Left((3L, (3L, 3, "Hello"))),
-      Left((4L, (4L, 4, "Hello"))),
-      Left((5L, (5L, 5, "Hello"))),
-      Left((6L, (6L, 6, "Hello"))),
-      Left((7L, (7L, 7, "Hello World"))),
-      Right(7L),
-      Left((9L, (9L, 9, "Hello World"))),
-      Left((8L, (8L, 8, "Hello World"))),
-      Left((8L, (8L, 8, "Hello World"))),
-      Right(20L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, a, " +
-      "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)," +
-      "sum(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
-      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
-      "Hello,3,3,7",
-      "Hello,4,3,9", "Hello,5,3,12",
-      "Hello,6,3,15", "Hello World,7,3,18",
-      "Hello World,8,3,21", "Hello World,8,3,23",
-      "Hello World,9,3,25",
-      "Hello World,20,3,37")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
-    val data = Seq(
-      Left((1500L, (1L, 15, "Hello"))),
-      Left((1600L, (1L, 16, "Hello"))),
-      Left((1000L, (1L, 1, "Hello"))),
-      Left((2000L, (2L, 2, "Hello"))),
-      Right(1000L),
-      Left((2000L, (2L, 2, "Hello"))),
-      Left((2000L, (2L, 3, "Hello"))),
-      Left((3000L, (3L, 3, "Hello"))),
-      Right(2000L),
-      Left((4000L, (4L, 4, "Hello"))),
-      Right(3000L),
-      Left((5000L, (5L, 5, "Hello"))),
-      Right(5000L),
-      Left((6000L, (6L, 6, "Hello"))),
-      Left((6500L, (6L, 65, "Hello"))),
-      Right(7000L),
-      Left((9000L, (6L, 9, "Hello"))),
-      Left((9500L, (6L, 18, "Hello"))),
-      Left((9000L, (6L, 9, "Hello"))),
-      Right(10000L),
-      Left((10000L, (7L, 7, "Hello World"))),
-      Left((11000L, (7L, 17, "Hello World"))),
-      Left((11000L, (7L, 77, "Hello World"))),
-      Right(12000L),
-      Left((14000L, (7L, 18, "Hello World"))),
-      Right(14000L),
-      Left((15000L, (8L, 8, "Hello World"))),
-      Right(17000L),
-      Left((20000L, (20L, 20, "Hello World"))),
-      Right(19000L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, b, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
-      "preceding AND CURRENT ROW)" +
-      ", sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
-      " preceding AND CURRENT ROW)" +
-      " from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
-      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
-      "Hello,3,4,9",
-      "Hello,4,2,7",
-      "Hello,5,2,9",
-      "Hello,6,2,11", "Hello,65,2,12",
-      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
-      "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
-      "Hello World,8,2,15",
-      "Hello World,20,1,20")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testBoundNonPartitionedEventTimeWindowWithRange(): Unit = {
-    val data = Seq(
-      Left((1500L, (1L, 15, "Hello"))),
-      Left((1600L, (1L, 16, "Hello"))),
-      Left((1000L, (1L, 1, "Hello"))),
-      Left((2000L, (2L, 2, "Hello"))),
-      Right(1000L),
-      Left((2000L, (2L, 2, "Hello"))),
-      Left((2000L, (2L, 3, "Hello"))),
-      Left((3000L, (3L, 3, "Hello"))),
-      Right(2000L),
-      Left((4000L, (4L, 4, "Hello"))),
-      Right(3000L),
-      Left((5000L, (5L, 5, "Hello"))),
-      Right(5000L),
-      Left((6000L, (6L, 6, "Hello"))),
-      Left((6500L, (6L, 65, "Hello"))),
-      Right(7000L),
-      Left((9000L, (6L, 9, "Hello"))),
-      Left((9500L, (6L, 18, "Hello"))),
-      Left((9000L, (6L, 9, "Hello"))),
-      Right(10000L),
-      Left((10000L, (7L, 7, "Hello World"))),
-      Left((11000L, (7L, 17, "Hello World"))),
-      Left((11000L, (7L, 77, "Hello World"))),
-      Right(12000L),
-      Left((14000L, (7L, 18, "Hello World"))),
-      Right(14000L),
-      Left((15000L, (8L, 8, "Hello World"))),
-      Right(17000L),
-      Left((20000L, (20L, 20, "Hello World"))),
-      Right(19000L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, b, " +
-      "count(a) OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
-      "preceding AND CURRENT ROW)" +
-      ", sum(a) OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
-      " preceding AND CURRENT ROW)" +
-      " from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
-      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
-      "Hello,3,4,9",
-      "Hello,4,2,7",
-      "Hello,5,2,9",
-      "Hello,6,2,11", "Hello,65,2,12",
-      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
-      "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
-      "Hello World,8,2,15",
-      "Hello World,20,1,20")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /**
-    * All aggregates must be computed on the same window.
-    */
-  @Test(expected = classOf[TableException])
-  def testMultiWindow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-  }
-
-  /** test sliding event-time unbounded window with partition by **/
-  @Test
-  def testUnboundedEventTimeRowWindowWithPartition(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "count(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "avg(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "max(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "min(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (3, 1L, "Hello")),
-      Left(14000003L, (1, 2L, "Hello")),
-      Left(14000004L, (1, 3L, "Hello world")),
-      Left(14000007L, (3, 2L, "Hello world")),
-      Left(14000008L, (2, 2L, "Hello world")),
-      Right(14000010L),
-      // the next 3 elements are late
-      Left(14000008L, (1, 4L, "Hello world")),
-      Left(14000008L, (2, 3L, "Hello world")),
-      Left(14000008L, (3, 3L, "Hello world")),
-      Left(14000012L, (1, 5L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 6L, "Hello world")),
-      // the next 3 elements are late
-      Left(14000019L, (1, 6L, "Hello world")),
-      Left(14000018L, (2, 4L, "Hello world")),
-      Left(14000018L, (3, 4L, "Hello world")),
-      Left(14000022L, (2, 5L, "Hello world")),
-      Left(14000022L, (3, 5L, "Hello world")),
-      Left(14000024L, (1, 7L, "Hello world")),
-      Left(14000023L, (1, 8L, "Hello world")),
-      Left(14000021L, (1, 9L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,2,Hello,2,1,2,2,2",
-      "1,3,Hello world,5,2,2,3,2",
-      "1,1,Hi,6,3,2,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,3,2,1,2,1",
-      "3,1,Hello,1,1,1,1,1",
-      "3,2,Hello world,3,2,1,2,1",
-      "1,5,Hello world,11,4,2,5,1",
-      "1,6,Hello world,17,5,3,6,1",
-      "1,9,Hello world,26,6,4,9,1",
-      "1,8,Hello world,34,7,4,9,1",
-      "1,7,Hello world,41,8,5,9,1",
-      "2,5,Hello world,8,3,2,5,1",
-      "3,5,Hello world,8,3,2,5,1"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test sliding event-time unbounded window with partition by **/
-  @Test
-  def testUnboundedEventTimeRowWindowWithPartitionMultiThread(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "count(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "avg(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "max(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "min(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (3, 1L, "Hello")),
-      Left(14000003L, (1, 2L, "Hello")),
-      Left(14000004L, (1, 3L, "Hello world")),
-      Left(14000007L, (3, 2L, "Hello world")),
-      Left(14000008L, (2, 2L, "Hello world")),
-      Right(14000010L),
-      Left(14000012L, (1, 5L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 6L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Left(14000024L, (3, 5L, "Hello world")),
-      Left(14000026L, (1, 7L, "Hello world")),
-      Left(14000025L, (1, 8L, "Hello world")),
-      Left(14000022L, (1, 9L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,2,Hello,2,1,2,2,2",
-      "1,3,Hello world,5,2,2,3,2",
-      "1,1,Hi,6,3,2,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,3,2,1,2,1",
-      "3,1,Hello,1,1,1,1,1",
-      "3,2,Hello world,3,2,1,2,1",
-      "1,5,Hello world,11,4,2,5,1",
-      "1,6,Hello world,17,5,3,6,1",
-      "1,9,Hello world,26,6,4,9,1",
-      "1,8,Hello world,34,7,4,9,1",
-      "1,7,Hello world,41,8,5,9,1",
-      "2,5,Hello world,8,3,2,5,1",
-      "3,5,Hello world,8,3,2,5,1"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test sliding event-time unbounded window without partitiion by **/
-  @Test
-  def testUnboundedEventTimeRowWindowWithoutPartition(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "count(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "avg(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "max(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "min(b) over (order by rowtime rows between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 2L, "Hello")),
-      Left(14000002L, (3, 5L, "Hello")),
-      Left(14000003L, (1, 3L, "Hello")),
-      Left(14000004L, (3, 7L, "Hello world")),
-      Left(14000007L, (4, 9L, "Hello world")),
-      Left(14000008L, (5, 8L, "Hello world")),
-      Right(14000010L),
-      // this element will be discard because it is late
-      Left(14000008L, (6, 8L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (6, 8L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "2,2,Hello,2,1,2,2,2",
-      "3,5,Hello,7,2,3,5,2",
-      "1,3,Hello,10,3,3,5,2",
-      "3,7,Hello world,17,4,4,7,2",
-      "1,1,Hi,18,5,3,7,1",
-      "4,9,Hello world,27,6,4,9,1",
-      "5,8,Hello world,35,7,5,9,1",
-      "6,8,Hello world,43,8,5,9,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test sliding event-time unbounded window without partitiion by and arrive early **/
-  @Test
-  def testUnboundedEventTimeRowWindowArriveEarly(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "count(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "avg(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "max(b) over (order by rowtime rows between unbounded preceding and current row), " +
-      "min(b) over (order by rowtime rows between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 2L, "Hello")),
-      Left(14000002L, (3, 5L, "Hello")),
-      Left(14000003L, (1, 3L, "Hello")),
-      // next three elements are early
-      Left(14000012L, (3, 7L, "Hello world")),
-      Left(14000013L, (4, 9L, "Hello world")),
-      Left(14000014L, (5, 8L, "Hello world")),
-      Right(14000010L),
-      Left(14000011L, (6, 8L, "Hello world")),
-      // next element is early
-      Left(14000021L, (6, 8L, "Hello world")),
-      Right(14000020L),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "2,2,Hello,2,1,2,2,2",
-      "3,5,Hello,7,2,3,5,2",
-      "1,3,Hello,10,3,3,5,2",
-      "1,1,Hi,11,4,2,5,1",
-      "6,8,Hello world,19,5,3,8,1",
-      "3,7,Hello world,26,6,4,8,1",
-      "4,9,Hello world,35,7,5,9,1",
-      "5,8,Hello world,43,8,5,9,1",
-      "6,8,Hello world,51,9,5,9,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test sliding event-time non-partitioned unbounded RANGE window **/
-  @Test
-  def testUnboundedNonPartitionedEventTimeRangeWindow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (order by rowtime range between unbounded preceding and current row), " +
-      "count(b) over (order by rowtime range between unbounded preceding and current row), " +
-      "avg(b) over (order by rowtime range between unbounded preceding and current row), " +
-      "max(b) over (order by rowtime range between unbounded preceding and current row), " +
-      "min(b) over (order by rowtime range between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (1, 1L, "Hello")),
-      Left(14000002L, (1, 2L, "Hello")),
-      Left(14000002L, (1, 3L, "Hello world")),
-      Left(14000003L, (2, 2L, "Hello world")),
-      Left(14000003L, (2, 3L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 4L, "Hello world")),
-      Left(14000022L, (1, 5L, "Hello world")),
-      Left(14000022L, (1, 6L, "Hello world")),
-      Left(14000022L, (1, 7L, "Hello world")),
-      Left(14000023L, (2, 4L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "2,1,Hello,1,1,1,1,1",
-      "1,1,Hello,7,4,1,3,1",
-      "1,2,Hello,7,4,1,3,1",
-      "1,3,Hello world,7,4,1,3,1",
-      "2,2,Hello world,12,6,2,3,1",
-      "2,3,Hello world,12,6,2,3,1",
-      "1,1,Hi,13,7,1,3,1",
-      "1,4,Hello world,17,8,2,4,1",
-      "1,5,Hello world,35,11,3,7,1",
-      "1,6,Hello world,35,11,3,7,1",
-      "1,7,Hello world,35,11,3,7,1",
-      "2,4,Hello world,44,13,3,7,1",
-      "2,5,Hello world,44,13,3,7,1"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test sliding event-time unbounded RANGE window **/
-  @Test
-  def testUnboundedPartitionedEventTimeRangeWindow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (" +
-      "partition by a order by rowtime range between unbounded preceding and current row), " +
-      "count(b) over (" +
-      "partition by a order by rowtime range between unbounded preceding and current row), " +
-      "avg(b) over (" +
-      "partition by a order by rowtime range between unbounded preceding and current row), " +
-      "max(b) over (" +
-      "partition by a order by rowtime range between unbounded preceding and current row), " +
-      "min(b) over (" +
-      "partition by a order by rowtime range between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (1, 1L, "Hello")),
-      Left(14000002L, (1, 2L, "Hello")),
-      Left(14000002L, (1, 3L, "Hello world")),
-      Left(14000003L, (2, 2L, "Hello world")),
-      Left(14000003L, (2, 3L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 4L, "Hello world")),
-      Left(14000022L, (1, 5L, "Hello world")),
-      Left(14000022L, (1, 6L, "Hello world")),
-      Left(14000022L, (1, 7L, "Hello world")),
-      Left(14000023L, (2, 4L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,1,Hello,6,3,2,3,1",
-      "1,2,Hello,6,3,2,3,1",
-      "1,3,Hello world,6,3,2,3,1",
-      "1,1,Hi,7,4,1,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,6,3,2,3,1",
-      "2,3,Hello world,6,3,2,3,1",
-      "1,4,Hello world,11,5,2,4,1",
-      "1,5,Hello world,29,8,3,7,1",
-      "1,6,Hello world,29,8,3,7,1",
-      "1,7,Hello world,29,8,3,7,1",
-      "2,4,Hello world,15,5,3,5,1",
-      "2,5,Hello world,15,5,3,5,1"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testPartitionedProcTimeOverWindow(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-    tEnv.registerTable("MyTable", t)
-
-    val sqlQuery = "SELECT a,  " +
-      " SUM(c) OVER (" +
-      " PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
-      " MIN(c) OVER (" +
-      " PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
-      " FROM MyTable"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,0,0",
-      "2,1,1",
-      "2,3,1",
-      "3,3,3",
-      "3,7,3",
-      "3,12,3",
-      "4,6,6",
-      "4,13,6",
-      "4,21,6",
-      "4,24,7",
-      "5,10,10",
-      "5,21,10",
-      "5,33,10",
-      "5,36,11",
-      "5,39,12")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testPartitionedProcTimeOverWindow2(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-    tEnv.registerTable("MyTable", t)
-
-    val sqlQuery = "SELECT a,  " +
-      " SUM(c) OVER (" +
-      " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC , " +
-      " MIN(c) OVER (" +
-      " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
-      " FROM MyTable"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,0,0",
-      "2,1,1",
-      "2,3,1",
-      "3,3,3",
-      "3,7,3",
-      "3,12,3",
-      "4,6,6",
-      "4,13,6",
-      "4,21,6",
-      "4,30,6",
-      "5,10,10",
-      "5,21,10",
-      "5,33,10",
-      "5,46,10",
-      "5,60,10")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-
-  @Test
-  def testNonPartitionedProcTimeOverWindow(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-    tEnv.registerTable("MyTable", t)
-
-    val sqlQuery = "SELECT a,  " +
-      " SUM(c) OVER (" +
-      " ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
-      " MIN(c) OVER (" +
-      " ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
-      " FROM MyTable"
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,0,0",
-      "2,1,0",
-      "2,3,0",
-      "3,6,1",
-      "3,9,2",
-      "3,12,3",
-      "4,15,4",
-      "4,18,5",
-      "4,21,6",
-      "4,24,7",
-      "5,27,8",
-      "5,30,9",
-      "5,33,10",
-      "5,36,11",
-      "5,39,12")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testNonPartitionedProcTimeOverWindow2(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-    tEnv.registerTable("MyTable", t)
-
-    val sqlQuery = "SELECT a,  " +
-      " SUM(c) OVER (" +
-      " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
-      " MIN(c) OVER (" +
-      " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
-      " FROM MyTable"
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = List(
-      "1,0,0",
-      "2,1,0",
-      "2,3,0",
-      "3,6,0",
-      "3,10,0",
-      "3,15,0",
-      "4,21,0",
-      "4,28,0",
-      "4,36,0",
-      "4,45,0",
-      "5,55,0",
-      "5,66,1",
-      "5,77,2",
-      "5,88,3",
-      "5,99,4")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-}
-
-object SqlITCase {
-
-  class EventTimeSourceFunction[T](
-      dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
-    override def run(ctx: SourceContext[T]): Unit = {
-      dataWithTimestampList.foreach {
-        case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
-        case Right(w) => ctx.emitWatermark(new Watermark(w))
-      }
-    }
-
-    override def cancel(): Unit = ???
-  }
 }


[2/3] flink git commit: [FLINK-6257] [table] Refactor OVER window tests.

Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 4c1d6e6..125d071 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -32,64 +32,9 @@ class WindowAggregateTest extends TableTestBase {
     "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
 
   @Test
-  def testNonPartitionedProcessingTimeBoundedWindow() = {
-
-    val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY proctime  " +
-      "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " +
-      "FROM MyTable"
-      val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0")
-        ),
-        term("select", "a", "w0$o0 AS $1")
-      )
-
-    streamUtil.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testPartitionedProcessingTimeBoundedWindow() = {
-
-    val sqlQuery =
-      "SELECT a, " +
-      "  AVG(c) OVER (PARTITION BY a ORDER BY proctime " +
-      "    RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
-      "FROM MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("partitionBy","a"),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1")
-        ),
-        term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA")
-      )
-
-    streamUtil.verifySql(sqlQuery, expected)
-  }
-
-  @Test
   def testGroupbyWithoutWindow() = {
     val sql = "SELECT COUNT(a) FROM MyTable GROUP BY b"
+
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -241,327 +186,4 @@ class WindowAggregateTest extends TableTestBase {
 
     streamUtil.verifySql(sqlQuery, "n/a")
   }
-
-  @Test
-  def testUnboundPartitionedProcessingWindowWithRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testUnboundPartitionedProcessingWindowWithRow() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
-      "CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          streamTableNode(0),
-          term("partitionBy", "c"),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testUnboundNonPartitionedProcessingWindowWithRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testUnboundNonPartitionedProcessingWindowWithRow() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
-      "CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          streamTableNode(0),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testUnboundNonPartitionedEventTimeWindowWithRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testUnboundPartitionedEventTimeWindowWithRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testBoundPartitionedRowTimeWindowWithRow() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
-      "CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "rowtime"),
-          term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testBoundNonPartitionedRowTimeWindowWithRow() = {
-    val sql = "SELECT " +
-        "c, " +
-        "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
-        "CURRENT ROW) as cnt1 " +
-        "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testBoundPartitionedRowTimeWindowWithRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime " +
-      "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testBoundNonPartitionedRowTimeWindowWithRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY rowtime " +
-      "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
- @Test
-  def testBoundNonPartitionedProcTimeWindowWithRowRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
-      "CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testBoundPartitionedProcTimeWindowWithRowRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
-      "CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
new file mode 100644
index 0000000..eb5acd5b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.util.{Comparator, Queue => JQueue}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
+import org.apache.flink.table.runtime.types.CRow
+
+class HarnessTestBase {
+  def createHarnessTester[IN, OUT, KEY](
+    operator: OneInputStreamOperator[IN, OUT],
+    keySelector: KeySelector[IN, KEY],
+    keyType: TypeInformation[KEY]): KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+    new KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT](operator, keySelector, keyType)
+  }
+
+  def verify(
+    expected: JQueue[Object],
+    actual: JQueue[Object],
+    comparator: Comparator[Object],
+    checkWaterMark: Boolean = false): Unit = {
+    if (!checkWaterMark) {
+      val it = actual.iterator()
+      while (it.hasNext) {
+        val data = it.next()
+        if (data.isInstanceOf[Watermark]) {
+          actual.remove(data)
+        }
+      }
+    }
+    TestHarnessUtil.assertOutputEqualsSorted("Verify Error...", expected, actual, comparator)
+  }
+}
+
+object HarnessTestBase {
+
+  /**
+    * Return 0 for equal Rows and non zero for different rows
+    */
+  class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable {
+
+    override def compare(o1: Object, o2: Object): Int = {
+
+      if (o1.isInstanceOf[Watermark] || o2.isInstanceOf[Watermark]) {
+        // watermark is not expected
+        -1
+      } else {
+        val row1 = o1.asInstanceOf[StreamRecord[CRow]].getValue
+        val row2 = o2.asInstanceOf[StreamRecord[CRow]].getValue
+        row1.toString.compareTo(row2.toString)
+      }
+    }
+  }
+
+  /**
+    * Tuple row key selector that returns a specified field as the selector function
+    */
+  class TupleRowKeySelector[T](
+    private val selectorField: Int) extends KeySelector[CRow, T] {
+
+    override def getKey(value: CRow): T = {
+      value.row.getField(selectorField).asInstanceOf[T]
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
new file mode 100644
index 0000000..56ca85c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -0,0 +1,974 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Integer => JInt, Long => JLong}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.codegen.GeneratedAggregationsFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.harness.HarnessTestBase._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class OverWindowHarnessTest extends HarnessTestBase{
+
+  private val rT = new RowTypeInfo(Array[TypeInformation[_]](
+    INT_TYPE_INFO,
+    LONG_TYPE_INFO,
+    INT_TYPE_INFO,
+    STRING_TYPE_INFO,
+    LONG_TYPE_INFO),
+    Array("a", "b", "c", "d", "e"))
+
+  private val cRT = new CRowTypeInfo(rT)
+
+  private val aggregates =
+    Array(new LongMinWithRetractAggFunction,
+      new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]]
+  private val aggregationStateType: RowTypeInfo = AggregateUtil.createAccumulatorRowType(aggregates)
+
+  val funcCode: String =
+    """
+      |public class BoundedOverAggregateHelper
+      |  extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
+      |
+      |  transient org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction
+      |    fmin = null;
+      |
+      |  transient org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction
+      |    fmax = null;
+      |
+      |  public BoundedOverAggregateHelper() throws Exception {
+      |
+      |    fmin = (org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
+      |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+      |    .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" +
+      |    "MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" +
+      |    "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S" +
+      |    "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" +
+      |    "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" +
+      |    "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" +
+      |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
+      |
+      |    fmax = (org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
+      |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+      |    .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" +
+      |    "MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" +
+      |    "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf" +
+      |    "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" +
+      |    "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" +
+      |    "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" +
+      |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
+      |  }
+      |
+      |  public void setAggregationResults(
+      |    org.apache.flink.types.Row accs,
+      |    org.apache.flink.types.Row output) {
+      |
+      |    org.apache.flink.table.functions.AggregateFunction baseClass0 =
+      |      (org.apache.flink.table.functions.AggregateFunction) fmin;
+      |    output.setField(5, baseClass0.getValue(
+      |      (org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+      |      accs.getField(0)));
+      |
+      |    org.apache.flink.table.functions.AggregateFunction baseClass1 =
+      |      (org.apache.flink.table.functions.AggregateFunction) fmax;
+      |    output.setField(6, baseClass1.getValue(
+      |      (org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+      |      accs.getField(1)));
+      |  }
+      |
+      |  public void accumulate(
+      |    org.apache.flink.types.Row accs,
+      |    org.apache.flink.types.Row input) {
+      |
+      |    fmin.accumulate(
+      |      ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+      |      accs.getField(0)),
+      |      (java.lang.Long) input.getField(4));
+      |
+      |    fmax.accumulate(
+      |      ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+      |      accs.getField(1)),
+      |      (java.lang.Long) input.getField(4));
+      |  }
+      |
+      |  public void retract(
+      |    org.apache.flink.types.Row accs,
+      |    org.apache.flink.types.Row input) {
+      |
+      |    fmin.retract(
+      |      ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+      |      accs.getField(0)),
+      |      (java.lang.Long) input.getField(4));
+      |
+      |    fmax.retract(
+      |      ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+      |      accs.getField(1)),
+      |      (java.lang.Long) input.getField(4));
+      |  }
+      |
+      |  public org.apache.flink.types.Row createAccumulators() {
+      |
+      |    org.apache.flink.types.Row accs = new org.apache.flink.types.Row(2);
+      |
+      |    accs.setField(
+      |      0,
+      |      fmin.createAccumulator());
+      |
+      |    accs.setField(
+      |      1,
+      |      fmax.createAccumulator());
+      |
+      |      return accs;
+      |  }
+      |
+      |  public void setForwardedFields(
+      |    org.apache.flink.types.Row input,
+      |    org.apache.flink.types.Row output) {
+      |
+      |    output.setField(0, input.getField(0));
+      |    output.setField(1, input.getField(1));
+      |    output.setField(2, input.getField(2));
+      |    output.setField(3, input.getField(3));
+      |    output.setField(4, input.getField(4));
+      |  }
+      |
+      |  public org.apache.flink.types.Row createOutputRow() {
+      |    return new org.apache.flink.types.Row(7);
+      |  }
+      |
+      |/*******  This test does not use the following methods  *******/
+      |  public org.apache.flink.types.Row mergeAccumulatorsPair(
+      |    org.apache.flink.types.Row a,
+      |    org.apache.flink.types.Row b) {
+      |    return null;
+      |  }
+      |
+      |  public void resetAccumulator(org.apache.flink.types.Row accs) {
+      |  }
+      |
+      |  public void setConstantFlags(org.apache.flink.types.Row output) {
+      |  }
+      |}
+    """.stripMargin
+
+
+  private val funcName = "BoundedOverAggregateHelper"
+
+  private val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode)
+
+
+  @Test
+  def testProcTimeBoundedRowsOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new ProcTimeBoundedRowsOver(
+        genAggFunction,
+        2,
+        aggregationStateType,
+        cRT))
+
+    val testHarness =
+      createHarnessTester(processFunction,new TupleRowKeySelector[Integer](0),BasicTypeInfo
+        .INT_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.setProcessingTime(1)
+
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 1))
+
+    testHarness.setProcessingTime(2)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 2))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 2))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 2))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 2))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 2))
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 2L: JLong, 3L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 3L: JLong, 4L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 4L: JLong, 5L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 20L: JLong, 30L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 6L: JLong, 7L: JLong), true), 2))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 7L: JLong, 8L: JLong), true), 2))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 9L: JLong), true), 2))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 9L: JLong, 10L: JLong), true), 2))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 2))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+
+    testHarness.close()
+  }
+
+  /**
+    * NOTE: all elements at the same proc timestamp have the same value per key
+    */
+  @Test
+  def testProcTimeBoundedRangeOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new ProcTimeBoundedRangeOver(
+        genAggFunction,
+        1000,
+        aggregationStateType,
+        cRT))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[Integer](0),
+        BasicTypeInfo.INT_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.setProcessingTime(3)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0))
+
+    testHarness.setProcessingTime(4)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0))
+
+    testHarness.setProcessingTime(5)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0))
+
+    testHarness.setProcessingTime(6)
+
+    testHarness.setProcessingTime(1002)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
+
+    testHarness.setProcessingTime(1003)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0))
+
+    testHarness.setProcessingTime(1004)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0))
+
+    testHarness.setProcessingTime(1005)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0))
+
+    testHarness.setProcessingTime(1006)
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // all elements at the same proc timestamp have the same value per key
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 4))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 4))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 5))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 5))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 5))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 6))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1004))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 1005))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), true), 1006))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), true), 1006))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 1006))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+
+    testHarness.close()
+  }
+
+  @Test
+  def testProcTimeUnboundedOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new ProcTimeUnboundedPartitionedOver(
+        genAggFunction,
+        aggregationStateType))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[Integer](0),
+        BasicTypeInfo.INT_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
+
+    testHarness.setProcessingTime(1003)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 1003))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 1003))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 1003))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 1003))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 1003))
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 1003))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+    testHarness.close()
+  }
+
+  /**
+    * all elements at the same row-time have the same value per key
+    */
+  @Test
+  def testRowTimeBoundedRangeOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new RowTimeBoundedRangeOver(
+        genAggFunction,
+        aggregationStateType,
+        cRT,
+        4000))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[String](3),
+        BasicTypeInfo.STRING_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.processWatermark(1)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 2))
+
+    testHarness.processWatermark(2)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 3))
+
+    testHarness.processWatermark(4000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001))
+
+    testHarness.processWatermark(4001)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4002))
+
+    testHarness.processWatermark(4002)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "aaa", 4L: JLong), true), 4003))
+
+    testHarness.processWatermark(4800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 11L: JLong, 1: JInt, "bbb", 25L: JLong), true), 4801))
+
+    testHarness.processWatermark(6500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501))
+
+    testHarness.processWatermark(7000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001))
+
+    testHarness.processWatermark(8000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001))
+
+    testHarness.processWatermark(12000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001))
+
+    testHarness.processWatermark(19000)
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // all elements at the same row-time have the same value per key
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 2))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 3))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4002))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 0L: JLong, 0: JInt, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true), 4003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 11L: JLong, 1: JInt, "bbb", 25L: JLong, 25L: JLong, 25L: JLong), true), 4801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 2L: JLong, 6L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 2L: JLong, 6L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 2L: JLong, 7L: JLong), true), 7001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 8001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 25L: JLong, 30L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 10L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 12001))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+    testHarness.close()
+  }
+
+  @Test
+  def testRowTimeBoundedRowsOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new RowTimeBoundedRowsOver(
+        genAggFunction,
+        aggregationStateType,
+        cRT,
+        3))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[String](3),
+        BasicTypeInfo.STRING_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.processWatermark(800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
+
+    testHarness.processWatermark(2500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501))
+
+    testHarness.processWatermark(4000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001))
+
+    testHarness.processWatermark(4800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801))
+
+    testHarness.processWatermark(6500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501))
+
+    testHarness.processWatermark(7000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001))
+
+    testHarness.processWatermark(8000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001))
+
+    testHarness.processWatermark(12000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001))
+
+    testHarness.processWatermark(19000)
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true), 4801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 3L: JLong, 5L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 4L: JLong, 6L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 5L: JLong, 7L: JLong), true), 7001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 6L: JLong, 8L: JLong), true), 8001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 7L: JLong, 9L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 20L: JLong, 40L: JLong), true), 12001))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+    testHarness.close()
+  }
+
+  /**
+    * all elements at the same row-time have the same value per key
+    */
+  @Test
+  def testRowTimeUnboundedRangeOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new RowTimeUnboundedRangeOver(
+        genAggFunction,
+        aggregationStateType,
+        cRT))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[String](3),
+        BasicTypeInfo.STRING_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.processWatermark(800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
+
+    testHarness.processWatermark(2500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501))
+
+    testHarness.processWatermark(4000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001))
+
+    testHarness.processWatermark(4800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801))
+
+    testHarness.processWatermark(6500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501))
+
+    testHarness.processWatermark(7000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001))
+
+    testHarness.processWatermark(8000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001))
+
+    testHarness.processWatermark(12000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001))
+
+    testHarness.processWatermark(19000)
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // all elements at the same row-time have the same value per key
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 4801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 7001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 8001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 10L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+    testHarness.close()
+  }
+
+  @Test
+  def testRowTimeUnboundedRowsOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new RowTimeUnboundedRowsOver(
+        genAggFunction,
+        aggregationStateType,
+        cRT))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[String](3),
+        BasicTypeInfo.STRING_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.processWatermark(800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
+
+    testHarness.processWatermark(2500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501))
+
+    testHarness.processWatermark(4000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001))
+
+    testHarness.processWatermark(4800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801))
+
+    testHarness.processWatermark(6500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501))
+
+    testHarness.processWatermark(7000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001))
+
+    testHarness.processWatermark(8000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001))
+
+    testHarness.processWatermark(12000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001))
+
+    testHarness.processWatermark(19000)
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 4801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 7001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 8001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+    testHarness.close()
+  }
+}