You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/31 19:52:55 UTC

flink git commit: [FLINK-5654] [table] Add processing-time OVER RANGE BETWEEN x PRECEDING aggregation to SQL.

Repository: flink
Updated Branches:
  refs/heads/master a48357db8 -> 31e120a98


[FLINK-5654] [table] Add processing-time OVER RANGE BETWEEN x PRECEDING aggregation to SQL.

This closes #3641.
This closes #3590.
This closes #3550.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31e120a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31e120a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31e120a9

Branch: refs/heads/master
Commit: 31e120a98da673ee12ae5879d95243fa0b555e00
Parents: a48357d
Author: rtudoran <tu...@ymail.com>
Authored: Wed Mar 29 12:02:11 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 31 21:42:48 2017 +0200

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |  14 ++
 .../datastream/DataStreamOverAggregate.scala    |  10 +-
 .../table/runtime/aggregate/AggregateUtil.scala |  25 ++-
 ...ndedProcessingOverRangeProcessFunction.scala | 203 ++++++++++++++++++
 .../scala/stream/sql/WindowAggregateTest.scala  |  53 +++++
 ...ProcessingOverRangeProcessFunctionTest.scala | 204 +++++++++++++++++++
 6 files changed, 497 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index a2945e8..6bcddc2 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -140,6 +140,20 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 2df4e02..e15db01 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -114,12 +114,14 @@ class DataStreamOverAggregate(
             // ROWS clause bounded OVER window
             createBoundedAndCurrentRowOverWindow(
               inputDS,
-              isRangeClause = true,
+              isRangeClause = false,
               isRowTimeType = false)
           } else {
             // RANGE clause bounded OVER window
-            throw new TableException(
-              "processing-time OVER RANGE PRECEDING window is not supported yet.")
+            createBoundedAndCurrentRowOverWindow(
+              inputDS,
+              isRangeClause = true,
+              isRowTimeType = false)
           }
         } else {
           throw new TableException(
@@ -206,7 +208,7 @@ class DataStreamOverAggregate(
     val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
 
     val precedingOffset =
-      getLowerBoundary(logicWindow, overWindow, getInput()) + 1
+      getLowerBoundary(logicWindow, overWindow, getInput()) + (if (isRangeClause) 0 else 1)
 
     // get the output types
     val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]

http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 74dc5cd..caa2818 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -139,13 +139,23 @@ object AggregateUtil {
         )
       }
     } else {
-      new BoundedProcessingOverRowProcessFunction(
-        aggregates,
-        aggFields,
-        precedingOffset,
-        inputType.getFieldCount,
-        aggregationStateType,
-        FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+      if (isRangeClause) {
+        new BoundedProcessingOverRangeProcessFunction(
+          aggregates,
+          aggFields,
+          inputType.getFieldCount,
+          aggregationStateType,
+          precedingOffset,
+          FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+      } else {
+        new BoundedProcessingOverRowProcessFunction(
+          aggregates,
+          aggFields,
+          precedingOffset,
+          inputType.getFieldCount,
+          aggregationStateType,
+          FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+      }
     }
   }
 
@@ -1240,4 +1250,3 @@ object AggregateUtil {
     if (b == 0) a else gcd(b, a % b)
   }
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala
new file mode 100644
index 0000000..afab11d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{ ArrayList, List => JList }
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+/**
+ * Process Function used for the aggregate in bounded proc-time OVER window
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
+ *                   used for this aggregation
+ * @param aggFields  the position (in the input Row) of the input value for each aggregate
+ * @param forwardedFieldCount Is used to indicate fields in the current element to forward
+ * @param rowTypeInfo Is used to indicate the field schema
+ * @param precedingTimeBoundary Is used to indicate the processing time boundaries
+ * @param inputType It is used to mark the Row type of the input
+ */
+class BoundedProcessingOverRangeProcessFunction(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val forwardedFieldCount: Int,
+  private val rowTypeInfo: RowTypeInfo,
+  private val precedingTimeBoundary: Long,
+  private val inputType: TypeInformation[Row])
+    extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+    output = new Row(forwardedFieldCount + aggregates.length)
+
+    // We keep the elements received in a MapState indexed based on their ingestion time
+    val rowListTypeInfo: TypeInformation[JList[Row]] =
+      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
+    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+    val stateDescriptor: ValueStateDescriptor[Row] =
+      new ValueStateDescriptor[Row]("overState", rowTypeInfo)
+    accumulatorState = getRuntimeContext.getState(stateDescriptor)
+  }
+
+  override def processElement(
+    input: Row,
+    ctx: ProcessFunction[Row, Row]#Context,
+    out: Collector[Row]): Unit = {
+
+    val currentTime = ctx.timerService.currentProcessingTime
+    // buffer the event incoming event
+
+    // add current element to the window list of elements with corresponding timestamp
+    var rowList = rowMapState.get(currentTime)
+    // null value means that this si the first event received for this timestamp
+    if (rowList == null) {
+      rowList = new ArrayList[Row]()
+      // register timer to process event once the current millisecond passed
+      ctx.timerService.registerProcessingTimeTimer(currentTime + 1)
+    }
+    rowList.add(input)
+    rowMapState.put(currentTime, rowList)
+
+  }
+
+  override def onTimer(
+    timestamp: Long,
+    ctx: ProcessFunction[Row, Row]#OnTimerContext,
+    out: Collector[Row]): Unit = {
+
+    // we consider the original timestamp of events that have registered this time trigger 1 ms ago
+    val currentTime = timestamp - 1
+    var i = 0
+
+    // initialize the accumulators
+    var accumulators = accumulatorState.value()
+
+    if (null == accumulators) {
+      accumulators = new Row(aggregates.length)
+      i = 0
+      while (i < aggregates.length) {
+        accumulators.setField(i, aggregates(i).createAccumulator())
+        i += 1
+      }
+    }
+
+    // update the elements to be removed and retract them from aggregators
+    val limit = currentTime - precedingTimeBoundary
+
+    // we iterate through all elements in the window buffer based on timestamp keys
+    // when we find timestamps that are out of interest, we retrieve corresponding elements
+    // and eliminate them. Multiple elements could have been received at the same timestamp
+    // the removal of old elements happens only once per proctime as onTimer is called only once
+    val iter = rowMapState.keys.iterator
+    val markToRemove = new ArrayList[Long]()
+    while (iter.hasNext) {
+      val elementKey = iter.next
+      if (elementKey < limit) {
+        // element key outside of window. Retract values
+        val elementsRemove = rowMapState.get(elementKey)
+        var iRemove = 0
+        while (iRemove < elementsRemove.size()) {
+          i = 0
+          while (i < aggregates.length) {
+            val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+            aggregates(i).retract(accumulator, elementsRemove.get(iRemove)
+               .getField(aggFields(i)(0)))
+            i += 1
+          }
+          iRemove += 1
+        }
+        // mark element for later removal not to modify the iterator over MapState
+        markToRemove.add(elementKey)
+      }
+    }
+    // need to remove in 2 steps not to have concurrent access errors via iterator to the MapState
+    i = 0
+    while (i < markToRemove.size()) {
+      rowMapState.remove(markToRemove.get(i))
+      i += 1
+    }
+
+    // get the list of elements of current proctime
+    val currentElements = rowMapState.get(currentTime)
+    // add current elements to aggregator. Multiple elements might have arrived in the same proctime
+    // the same accumulator value will be computed for all elements
+    var iElemenets = 0
+    while (iElemenets < currentElements.size()) {
+      val input = currentElements.get(iElemenets)
+      i = 0
+      while (i < aggregates.length) {
+        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+        aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
+        i += 1
+      }
+      iElemenets += 1
+    }
+
+    // we need to build the output and emit for every event received at this proctime
+    iElemenets = 0
+    while (iElemenets < currentElements.size()) {
+      val input = currentElements.get(iElemenets)
+
+      // set the fields of the last event to carry on with the aggregates
+      i = 0
+      while (i < forwardedFieldCount) {
+        output.setField(i, input.getField(i))
+        i += 1
+      }
+
+      // add the accumulators values to result
+      i = 0
+      while (i < aggregates.length) {
+        val index = forwardedFieldCount + i
+        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+        output.setField(index, aggregates(i).getValue(accumulator))
+        i += 1
+      }
+      out.collect(output)
+      iElemenets += 1
+    }
+
+    // update the value of accumulators for future incremental computation
+    accumulatorState.update(accumulators)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 52fd5f8..4e0d4fd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -30,6 +30,59 @@ class WindowAggregateTest extends TableTestBase {
   streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
 
   @Test
+  def testNonPartitionedProcessingTimeBoundedWindow() = {
+
+    val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" +
+      "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " +
+      "FROM MyTable"
+      val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "PROCTIME() AS $2")
+          ),
+          term("orderBy", "PROCTIME"),
+          term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0")
+        ),
+        term("select", "a", "w0$o0 AS $1")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testPartitionedProcessingTimeBoundedWindow() = {
+
+    val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" +
+      "RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
+      "FROM MyTable"
+      val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "PROCTIME() AS $2")
+          ),
+          term("partitionBy","a"),
+          term("orderBy", "PROCTIME"),
+          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1")
+        ),
+        term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
   def testNonPartitionedTumbleWindow() = {
     val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(rowtime() TO HOUR)"
     val expected =

http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
new file mode 100644
index 0000000..227bfc7
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.Comparator
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.lang.{Integer => JInt, Long => JLong}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
+import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
+import org.apache.flink.table.runtime.aggregate.BoundedProcessingOverRangeProcessFunctionTest._
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class BoundedProcessingOverRangeProcessFunctionTest {
+
+  @Test
+  def testProcTimePartitionedOverRange(): Unit = {
+
+    val rT =  new RowTypeInfo(Array[TypeInformation[_]](
+      INT_TYPE_INFO,
+      LONG_TYPE_INFO,
+      INT_TYPE_INFO,
+      STRING_TYPE_INFO,
+      LONG_TYPE_INFO),
+      Array("a", "b", "c", "d", "e"))
+
+    val rTA =  new RowTypeInfo(Array[TypeInformation[_]](
+     LONG_TYPE_INFO), Array("count"))
+
+    val processFunction = new KeyedProcessOperator[String, Row, Row](
+      new BoundedProcessingOverRangeProcessFunction(
+        Array(new LongMinWithRetractAggFunction, new LongMaxWithRetractAggFunction),
+        Array(Array(4), Array(4)),
+        5,
+        rTA,
+        1000,
+        rT))
+
+    val testHarness = new KeyedOneInputStreamOperatorTestHarness[JInt, Row, Row](
+      processFunction,
+      new TupleRowSelector(0),
+      BasicTypeInfo.INT_TYPE_INFO)
+
+    testHarness.open()
+
+    // Time = 3
+    testHarness.setProcessingTime(3)
+    // key = 1
+    testHarness.processElement(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), 0))
+    // key = 2
+    testHarness.processElement(new StreamRecord(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), 0))
+
+    // Time = 4
+    testHarness.setProcessingTime(4)
+    // key = 1
+    testHarness.processElement(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), 0))
+    testHarness.processElement(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), 0))
+    // key = 2
+    testHarness.processElement(new StreamRecord(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), 0))
+
+    // Time = 5
+    testHarness.setProcessingTime(5)
+    testHarness.processElement(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), 0))
+
+    // Time = 6
+    testHarness.setProcessingTime(6)
+
+    // Time = 1002
+    testHarness.setProcessingTime(1002)
+    // key = 1
+    testHarness.processElement(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), 0))
+    testHarness.processElement(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), 0))
+    // key = 2
+    testHarness.processElement(new StreamRecord(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), 0))
+
+    // Time = 1003
+    testHarness.setProcessingTime(1003)
+    testHarness.processElement(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), 0))
+
+    // Time = 1004
+    testHarness.setProcessingTime(1004)
+    testHarness.processElement(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), 0))
+
+    // Time = 1005
+    testHarness.setProcessingTime(1005)
+    // key = 1
+    testHarness.processElement(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), 0))
+    testHarness.processElement(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), 0))
+    // key = 2
+    testHarness.processElement(new StreamRecord(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), 0))
+
+    testHarness.setProcessingTime(1006)
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // all elements at the same proc timestamp have the same value
+    expectedOutput.add(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), 4))
+    expectedOutput.add(new StreamRecord(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), 4))
+    expectedOutput.add(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), 5))
+    expectedOutput.add(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), 5))
+    expectedOutput.add(new StreamRecord(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), 5))
+    expectedOutput.add(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), 6))
+    expectedOutput.add(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), 1003))
+    expectedOutput.add(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), 1003))
+    expectedOutput.add(new StreamRecord(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), 1003))
+    expectedOutput.add(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), 1004))
+    expectedOutput.add(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), 1005))
+    expectedOutput.add(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), 1006))
+    expectedOutput.add(new StreamRecord(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), 1006))
+    expectedOutput.add(new StreamRecord(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), 1006))
+
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.",
+        expectedOutput, result, new RowResultSortComparator(6))
+
+    testHarness.close()
+
+  }
+}
+
+object BoundedProcessingOverRangeProcessFunctionTest {
+
+/**
+ * Return 0 for equal Rows and non zero for different rows
+ */
+class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable {
+
+    override def compare(o1: Object, o2: Object):Int = {
+
+      if (o1.isInstanceOf[Watermark] || o2.isInstanceOf[Watermark]) {
+        // watermark is not expected
+         -1
+       } else {
+        val row1 = o1.asInstanceOf[StreamRecord[Row]].getValue
+        val row2 = o2.asInstanceOf[StreamRecord[Row]].getValue
+        row1.toString.compareTo(row2.toString)
+      }
+   }
+}
+
+/**
+ * Simple test class that returns a specified field as the selector function
+ */
+class TupleRowSelector(
+    private val selectorField:Int) extends KeySelector[Row, Integer] {
+
+  override def getKey(value: Row): Integer = {
+    value.getField(selectorField).asInstanceOf[Integer]
+  }
+}
+
+}