You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/07 12:18:29 UTC

[2/3] flink git commit: [FLINK-6257] [table] Refactor OVER window tests.

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 4c1d6e6..125d071 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -32,64 +32,9 @@ class WindowAggregateTest extends TableTestBase {
     "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
 
   @Test
-  def testNonPartitionedProcessingTimeBoundedWindow() = {
-
-    val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY proctime  " +
-      "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " +
-      "FROM MyTable"
-      val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0")
-        ),
-        term("select", "a", "w0$o0 AS $1")
-      )
-
-    streamUtil.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testPartitionedProcessingTimeBoundedWindow() = {
-
-    val sqlQuery =
-      "SELECT a, " +
-      "  AVG(c) OVER (PARTITION BY a ORDER BY proctime " +
-      "    RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
-      "FROM MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("partitionBy","a"),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1")
-        ),
-        term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA")
-      )
-
-    streamUtil.verifySql(sqlQuery, expected)
-  }
-
-  @Test
   def testGroupbyWithoutWindow() = {
     val sql = "SELECT COUNT(a) FROM MyTable GROUP BY b"
+
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -241,327 +186,4 @@ class WindowAggregateTest extends TableTestBase {
 
     streamUtil.verifySql(sqlQuery, "n/a")
   }
-
-  @Test
-  def testUnboundPartitionedProcessingWindowWithRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testUnboundPartitionedProcessingWindowWithRow() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
-      "CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          streamTableNode(0),
-          term("partitionBy", "c"),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testUnboundNonPartitionedProcessingWindowWithRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testUnboundNonPartitionedProcessingWindowWithRow() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
-      "CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          streamTableNode(0),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testUnboundNonPartitionedEventTimeWindowWithRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testUnboundPartitionedEventTimeWindowWithRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testBoundPartitionedRowTimeWindowWithRow() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
-      "CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "rowtime"),
-          term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testBoundNonPartitionedRowTimeWindowWithRow() = {
-    val sql = "SELECT " +
-        "c, " +
-        "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
-        "CURRENT ROW) as cnt1 " +
-        "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testBoundPartitionedRowTimeWindowWithRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime " +
-      "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testBoundNonPartitionedRowTimeWindowWithRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY rowtime " +
-      "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
- @Test
-  def testBoundNonPartitionedProcTimeWindowWithRowRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
-      "CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testBoundPartitionedProcTimeWindowWithRowRange() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
-      "CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
new file mode 100644
index 0000000..eb5acd5b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.util.{Comparator, Queue => JQueue}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
+import org.apache.flink.table.runtime.types.CRow
+
+class HarnessTestBase {
+  def createHarnessTester[IN, OUT, KEY](
+    operator: OneInputStreamOperator[IN, OUT],
+    keySelector: KeySelector[IN, KEY],
+    keyType: TypeInformation[KEY]): KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+    new KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT](operator, keySelector, keyType)
+  }
+
+  def verify(
+    expected: JQueue[Object],
+    actual: JQueue[Object],
+    comparator: Comparator[Object],
+    checkWaterMark: Boolean = false): Unit = {
+    if (!checkWaterMark) {
+      val it = actual.iterator()
+      while (it.hasNext) {
+        val data = it.next()
+        if (data.isInstanceOf[Watermark]) {
+          actual.remove(data)
+        }
+      }
+    }
+    TestHarnessUtil.assertOutputEqualsSorted("Verify Error...", expected, actual, comparator)
+  }
+}
+
+object HarnessTestBase {
+
+  /**
+    * Return 0 for equal Rows and non zero for different rows
+    */
+  class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable {
+
+    override def compare(o1: Object, o2: Object): Int = {
+
+      if (o1.isInstanceOf[Watermark] || o2.isInstanceOf[Watermark]) {
+        // watermark is not expected
+        -1
+      } else {
+        val row1 = o1.asInstanceOf[StreamRecord[CRow]].getValue
+        val row2 = o2.asInstanceOf[StreamRecord[CRow]].getValue
+        row1.toString.compareTo(row2.toString)
+      }
+    }
+  }
+
+  /**
+    * Tuple row key selector that returns a specified field as the selector function
+    */
+  class TupleRowKeySelector[T](
+    private val selectorField: Int) extends KeySelector[CRow, T] {
+
+    override def getKey(value: CRow): T = {
+      value.row.getField(selectorField).asInstanceOf[T]
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
new file mode 100644
index 0000000..56ca85c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -0,0 +1,974 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Integer => JInt, Long => JLong}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.codegen.GeneratedAggregationsFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.harness.HarnessTestBase._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class OverWindowHarnessTest extends HarnessTestBase{
+
+  private val rT = new RowTypeInfo(Array[TypeInformation[_]](
+    INT_TYPE_INFO,
+    LONG_TYPE_INFO,
+    INT_TYPE_INFO,
+    STRING_TYPE_INFO,
+    LONG_TYPE_INFO),
+    Array("a", "b", "c", "d", "e"))
+
+  private val cRT = new CRowTypeInfo(rT)
+
+  private val aggregates =
+    Array(new LongMinWithRetractAggFunction,
+      new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]]
+  private val aggregationStateType: RowTypeInfo = AggregateUtil.createAccumulatorRowType(aggregates)
+
+  val funcCode: String =
+    """
+      |public class BoundedOverAggregateHelper
+      |  extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
+      |
+      |  transient org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction
+      |    fmin = null;
+      |
+      |  transient org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction
+      |    fmax = null;
+      |
+      |  public BoundedOverAggregateHelper() throws Exception {
+      |
+      |    fmin = (org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
+      |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+      |    .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" +
+      |    "MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" +
+      |    "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S" +
+      |    "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" +
+      |    "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" +
+      |    "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" +
+      |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
+      |
+      |    fmax = (org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
+      |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+      |    .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" +
+      |    "MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" +
+      |    "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf" +
+      |    "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" +
+      |    "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" +
+      |    "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" +
+      |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
+      |  }
+      |
+      |  public void setAggregationResults(
+      |    org.apache.flink.types.Row accs,
+      |    org.apache.flink.types.Row output) {
+      |
+      |    org.apache.flink.table.functions.AggregateFunction baseClass0 =
+      |      (org.apache.flink.table.functions.AggregateFunction) fmin;
+      |    output.setField(5, baseClass0.getValue(
+      |      (org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+      |      accs.getField(0)));
+      |
+      |    org.apache.flink.table.functions.AggregateFunction baseClass1 =
+      |      (org.apache.flink.table.functions.AggregateFunction) fmax;
+      |    output.setField(6, baseClass1.getValue(
+      |      (org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+      |      accs.getField(1)));
+      |  }
+      |
+      |  public void accumulate(
+      |    org.apache.flink.types.Row accs,
+      |    org.apache.flink.types.Row input) {
+      |
+      |    fmin.accumulate(
+      |      ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+      |      accs.getField(0)),
+      |      (java.lang.Long) input.getField(4));
+      |
+      |    fmax.accumulate(
+      |      ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+      |      accs.getField(1)),
+      |      (java.lang.Long) input.getField(4));
+      |  }
+      |
+      |  public void retract(
+      |    org.apache.flink.types.Row accs,
+      |    org.apache.flink.types.Row input) {
+      |
+      |    fmin.retract(
+      |      ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+      |      accs.getField(0)),
+      |      (java.lang.Long) input.getField(4));
+      |
+      |    fmax.retract(
+      |      ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+      |      accs.getField(1)),
+      |      (java.lang.Long) input.getField(4));
+      |  }
+      |
+      |  public org.apache.flink.types.Row createAccumulators() {
+      |
+      |    org.apache.flink.types.Row accs = new org.apache.flink.types.Row(2);
+      |
+      |    accs.setField(
+      |      0,
+      |      fmin.createAccumulator());
+      |
+      |    accs.setField(
+      |      1,
+      |      fmax.createAccumulator());
+      |
+      |      return accs;
+      |  }
+      |
+      |  public void setForwardedFields(
+      |    org.apache.flink.types.Row input,
+      |    org.apache.flink.types.Row output) {
+      |
+      |    output.setField(0, input.getField(0));
+      |    output.setField(1, input.getField(1));
+      |    output.setField(2, input.getField(2));
+      |    output.setField(3, input.getField(3));
+      |    output.setField(4, input.getField(4));
+      |  }
+      |
+      |  public org.apache.flink.types.Row createOutputRow() {
+      |    return new org.apache.flink.types.Row(7);
+      |  }
+      |
+      |/*******  This test does not use the following methods  *******/
+      |  public org.apache.flink.types.Row mergeAccumulatorsPair(
+      |    org.apache.flink.types.Row a,
+      |    org.apache.flink.types.Row b) {
+      |    return null;
+      |  }
+      |
+      |  public void resetAccumulator(org.apache.flink.types.Row accs) {
+      |  }
+      |
+      |  public void setConstantFlags(org.apache.flink.types.Row output) {
+      |  }
+      |}
+    """.stripMargin
+
+
+  private val funcName = "BoundedOverAggregateHelper"
+
+  private val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode)
+
+
+  @Test
+  def testProcTimeBoundedRowsOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new ProcTimeBoundedRowsOver(
+        genAggFunction,
+        2,
+        aggregationStateType,
+        cRT))
+
+    val testHarness =
+      createHarnessTester(processFunction,new TupleRowKeySelector[Integer](0),BasicTypeInfo
+        .INT_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.setProcessingTime(1)
+
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 1))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 1))
+
+    testHarness.setProcessingTime(2)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 2))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 2))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 2))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 2))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 2))
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 2L: JLong, 3L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 3L: JLong, 4L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 4L: JLong, 5L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 20L: JLong, 30L: JLong), true), 1))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 6L: JLong, 7L: JLong), true), 2))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 7L: JLong, 8L: JLong), true), 2))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 9L: JLong), true), 2))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 9L: JLong, 10L: JLong), true), 2))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 2))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+
+    testHarness.close()
+  }
+
+  /**
+    * NOTE: all elements at the same proc timestamp have the same value per key
+    */
+  @Test
+  def testProcTimeBoundedRangeOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new ProcTimeBoundedRangeOver(
+        genAggFunction,
+        1000,
+        aggregationStateType,
+        cRT))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[Integer](0),
+        BasicTypeInfo.INT_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.setProcessingTime(3)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0))
+
+    testHarness.setProcessingTime(4)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0))
+
+    testHarness.setProcessingTime(5)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0))
+
+    testHarness.setProcessingTime(6)
+
+    testHarness.setProcessingTime(1002)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
+
+    testHarness.setProcessingTime(1003)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0))
+
+    testHarness.setProcessingTime(1004)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0))
+
+    testHarness.setProcessingTime(1005)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0))
+
+    testHarness.setProcessingTime(1006)
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // all elements at the same proc timestamp have the same value per key
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 4))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 4))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 5))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 5))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 5))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 6))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1004))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 1005))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), true), 1006))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), true), 1006))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 1006))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+
+    testHarness.close()
+  }
+
+  @Test
+  def testProcTimeUnboundedOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new ProcTimeUnboundedPartitionedOver(
+        genAggFunction,
+        aggregationStateType))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[Integer](0),
+        BasicTypeInfo.INT_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
+
+    testHarness.setProcessingTime(1003)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 1003))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 1003))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 1003))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 1003))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 1003))
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 0))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 1003))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+    testHarness.close()
+  }
+
+  /**
+    * all elements at the same row-time have the same value per key
+    */
+  @Test
+  def testRowTimeBoundedRangeOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new RowTimeBoundedRangeOver(
+        genAggFunction,
+        aggregationStateType,
+        cRT,
+        4000))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[String](3),
+        BasicTypeInfo.STRING_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.processWatermark(1)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 2))
+
+    testHarness.processWatermark(2)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 3))
+
+    testHarness.processWatermark(4000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001))
+
+    testHarness.processWatermark(4001)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4002))
+
+    testHarness.processWatermark(4002)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "aaa", 4L: JLong), true), 4003))
+
+    testHarness.processWatermark(4800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 11L: JLong, 1: JInt, "bbb", 25L: JLong), true), 4801))
+
+    testHarness.processWatermark(6500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501))
+
+    testHarness.processWatermark(7000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001))
+
+    testHarness.processWatermark(8000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001))
+
+    testHarness.processWatermark(12000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001))
+
+    testHarness.processWatermark(19000)
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // all elements at the same row-time have the same value per key
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 2))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 3))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4002))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 0L: JLong, 0: JInt, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true), 4003))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 11L: JLong, 1: JInt, "bbb", 25L: JLong, 25L: JLong, 25L: JLong), true), 4801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 2L: JLong, 6L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 2L: JLong, 6L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 2L: JLong, 7L: JLong), true), 7001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 8001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 25L: JLong, 30L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 10L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 12001))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+    testHarness.close()
+  }
+
+  @Test
+  def testRowTimeBoundedRowsOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new RowTimeBoundedRowsOver(
+        genAggFunction,
+        aggregationStateType,
+        cRT,
+        3))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[String](3),
+        BasicTypeInfo.STRING_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.processWatermark(800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
+
+    testHarness.processWatermark(2500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501))
+
+    testHarness.processWatermark(4000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001))
+
+    testHarness.processWatermark(4800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801))
+
+    testHarness.processWatermark(6500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501))
+
+    testHarness.processWatermark(7000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001))
+
+    testHarness.processWatermark(8000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001))
+
+    testHarness.processWatermark(12000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001))
+
+    testHarness.processWatermark(19000)
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true), 4801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 3L: JLong, 5L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 4L: JLong, 6L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 5L: JLong, 7L: JLong), true), 7001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 6L: JLong, 8L: JLong), true), 8001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 7L: JLong, 9L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 20L: JLong, 40L: JLong), true), 12001))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+    testHarness.close()
+  }
+
+  /**
+    * all elements at the same row-time have the same value per key
+    */
+  @Test
+  def testRowTimeUnboundedRangeOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new RowTimeUnboundedRangeOver(
+        genAggFunction,
+        aggregationStateType,
+        cRT))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[String](3),
+        BasicTypeInfo.STRING_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.processWatermark(800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
+
+    testHarness.processWatermark(2500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501))
+
+    testHarness.processWatermark(4000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001))
+
+    testHarness.processWatermark(4800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801))
+
+    testHarness.processWatermark(6500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501))
+
+    testHarness.processWatermark(7000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001))
+
+    testHarness.processWatermark(8000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001))
+
+    testHarness.processWatermark(12000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001))
+
+    testHarness.processWatermark(19000)
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // all elements at the same row-time have the same value per key
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 4801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 7001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 8001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 10L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+    testHarness.close()
+  }
+
+  @Test
+  def testRowTimeUnboundedRowsOver(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new RowTimeUnboundedRowsOver(
+        genAggFunction,
+        aggregationStateType,
+        cRT))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[String](3),
+        BasicTypeInfo.STRING_TYPE_INFO)
+
+    testHarness.open()
+
+    testHarness.processWatermark(800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
+
+    testHarness.processWatermark(2500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501))
+
+    testHarness.processWatermark(4000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001))
+
+    testHarness.processWatermark(4800)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801))
+
+    testHarness.processWatermark(6500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501))
+
+    testHarness.processWatermark(7000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001))
+
+    testHarness.processWatermark(8000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001))
+
+    testHarness.processWatermark(12000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001))
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001))
+
+    testHarness.processWatermark(19000)
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 4801))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 7001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 8001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 12001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+    testHarness.close()
+  }
+}