You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/31 19:52:55 UTC
flink git commit: [FLINK-5654] [table] Add processing-time OVER RANGE
BETWEEN x PRECEDING aggregation to SQL.
Repository: flink
Updated Branches:
refs/heads/master a48357db8 -> 31e120a98
[FLINK-5654] [table] Add processing-time OVER RANGE BETWEEN x PRECEDING aggregation to SQL.
This closes #3641.
This closes #3590.
This closes #3550.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31e120a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31e120a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31e120a9
Branch: refs/heads/master
Commit: 31e120a98da673ee12ae5879d95243fa0b555e00
Parents: a48357d
Author: rtudoran <tu...@ymail.com>
Authored: Wed Mar 29 12:02:11 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 31 21:42:48 2017 +0200
----------------------------------------------------------------------
flink-libraries/flink-table/pom.xml | 14 ++
.../datastream/DataStreamOverAggregate.scala | 10 +-
.../table/runtime/aggregate/AggregateUtil.scala | 25 ++-
...ndedProcessingOverRangeProcessFunction.scala | 203 ++++++++++++++++++
.../scala/stream/sql/WindowAggregateTest.scala | 53 +++++
...ProcessingOverRangeProcessFunctionTest.scala | 204 +++++++++++++++++++
6 files changed, 497 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index a2945e8..6bcddc2 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -140,6 +140,20 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 2df4e02..e15db01 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -114,12 +114,14 @@ class DataStreamOverAggregate(
// ROWS clause bounded OVER window
createBoundedAndCurrentRowOverWindow(
inputDS,
- isRangeClause = true,
+ isRangeClause = false,
isRowTimeType = false)
} else {
// RANGE clause bounded OVER window
- throw new TableException(
- "processing-time OVER RANGE PRECEDING window is not supported yet.")
+ createBoundedAndCurrentRowOverWindow(
+ inputDS,
+ isRangeClause = true,
+ isRowTimeType = false)
}
} else {
throw new TableException(
@@ -206,7 +208,7 @@ class DataStreamOverAggregate(
val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
val precedingOffset =
- getLowerBoundary(logicWindow, overWindow, getInput()) + 1
+ getLowerBoundary(logicWindow, overWindow, getInput()) + (if (isRangeClause) 0 else 1)
// get the output types
val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 74dc5cd..caa2818 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -139,13 +139,23 @@ object AggregateUtil {
)
}
} else {
- new BoundedProcessingOverRowProcessFunction(
- aggregates,
- aggFields,
- precedingOffset,
- inputType.getFieldCount,
- aggregationStateType,
- FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+ if (isRangeClause) {
+ new BoundedProcessingOverRangeProcessFunction(
+ aggregates,
+ aggFields,
+ inputType.getFieldCount,
+ aggregationStateType,
+ precedingOffset,
+ FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+ } else {
+ new BoundedProcessingOverRowProcessFunction(
+ aggregates,
+ aggFields,
+ precedingOffset,
+ inputType.getFieldCount,
+ aggregationStateType,
+ FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+ }
}
}
@@ -1240,4 +1250,3 @@ object AggregateUtil {
if (b == 0) a else gcd(b, a % b)
}
}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala
new file mode 100644
index 0000000..afab11d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{ ArrayList, List => JList }
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+/**
+ * Process Function used for the aggregate in bounded proc-time OVER window
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
+ * used for this aggregation
+ * @param aggFields the position (in the input Row) of the input value for each aggregate
+ * @param forwardedFieldCount Is used to indicate fields in the current element to forward
+ * @param rowTypeInfo Is used to indicate the field schema
+ * @param precedingTimeBoundary Is used to indicate the processing time boundaries
+ * @param inputType It is used to mark the Row type of the input
+ */
+class BoundedProcessingOverRangeProcessFunction(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val forwardedFieldCount: Int,
+ private val rowTypeInfo: RowTypeInfo,
+ private val precedingTimeBoundary: Long,
+ private val inputType: TypeInformation[Row])
+ extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+ private var output: Row = _
+ private var accumulatorState: ValueState[Row] = _
+ private var rowMapState: MapState[Long, JList[Row]] = _
+
+ override def open(config: Configuration) {
+ output = new Row(forwardedFieldCount + aggregates.length)
+
+ // We keep the elements received in a MapState indexed based on their ingestion time
+ val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+ val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
+ rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+ val stateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("overState", rowTypeInfo)
+ accumulatorState = getRuntimeContext.getState(stateDescriptor)
+ }
+
+ override def processElement(
+ input: Row,
+ ctx: ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+ val currentTime = ctx.timerService.currentProcessingTime
+ // buffer the event incoming event
+
+ // add current element to the window list of elements with corresponding timestamp
+ var rowList = rowMapState.get(currentTime)
+ // null value means that this si the first event received for this timestamp
+ if (rowList == null) {
+ rowList = new ArrayList[Row]()
+ // register timer to process event once the current millisecond passed
+ ctx.timerService.registerProcessingTimeTimer(currentTime + 1)
+ }
+ rowList.add(input)
+ rowMapState.put(currentTime, rowList)
+
+ }
+
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[Row, Row]#OnTimerContext,
+ out: Collector[Row]): Unit = {
+
+ // we consider the original timestamp of events that have registered this time trigger 1 ms ago
+ val currentTime = timestamp - 1
+ var i = 0
+
+ // initialize the accumulators
+ var accumulators = accumulatorState.value()
+
+ if (null == accumulators) {
+ accumulators = new Row(aggregates.length)
+ i = 0
+ while (i < aggregates.length) {
+ accumulators.setField(i, aggregates(i).createAccumulator())
+ i += 1
+ }
+ }
+
+ // update the elements to be removed and retract them from aggregators
+ val limit = currentTime - precedingTimeBoundary
+
+ // we iterate through all elements in the window buffer based on timestamp keys
+ // when we find timestamps that are out of interest, we retrieve corresponding elements
+ // and eliminate them. Multiple elements could have been received at the same timestamp
+ // the removal of old elements happens only once per proctime as onTimer is called only once
+ val iter = rowMapState.keys.iterator
+ val markToRemove = new ArrayList[Long]()
+ while (iter.hasNext) {
+ val elementKey = iter.next
+ if (elementKey < limit) {
+ // element key outside of window. Retract values
+ val elementsRemove = rowMapState.get(elementKey)
+ var iRemove = 0
+ while (iRemove < elementsRemove.size()) {
+ i = 0
+ while (i < aggregates.length) {
+ val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+ aggregates(i).retract(accumulator, elementsRemove.get(iRemove)
+ .getField(aggFields(i)(0)))
+ i += 1
+ }
+ iRemove += 1
+ }
+ // mark element for later removal not to modify the iterator over MapState
+ markToRemove.add(elementKey)
+ }
+ }
+ // need to remove in 2 steps not to have concurrent access errors via iterator to the MapState
+ i = 0
+ while (i < markToRemove.size()) {
+ rowMapState.remove(markToRemove.get(i))
+ i += 1
+ }
+
+ // get the list of elements of current proctime
+ val currentElements = rowMapState.get(currentTime)
+ // add current elements to aggregator. Multiple elements might have arrived in the same proctime
+ // the same accumulator value will be computed for all elements
+ var iElemenets = 0
+ while (iElemenets < currentElements.size()) {
+ val input = currentElements.get(iElemenets)
+ i = 0
+ while (i < aggregates.length) {
+ val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+ aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
+ i += 1
+ }
+ iElemenets += 1
+ }
+
+ // we need to build the output and emit for every event received at this proctime
+ iElemenets = 0
+ while (iElemenets < currentElements.size()) {
+ val input = currentElements.get(iElemenets)
+
+ // set the fields of the last event to carry on with the aggregates
+ i = 0
+ while (i < forwardedFieldCount) {
+ output.setField(i, input.getField(i))
+ i += 1
+ }
+
+ // add the accumulators values to result
+ i = 0
+ while (i < aggregates.length) {
+ val index = forwardedFieldCount + i
+ val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+ output.setField(index, aggregates(i).getValue(accumulator))
+ i += 1
+ }
+ out.collect(output)
+ iElemenets += 1
+ }
+
+ // update the value of accumulators for future incremental computation
+ accumulatorState.update(accumulators)
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 52fd5f8..4e0d4fd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -30,6 +30,59 @@ class WindowAggregateTest extends TableTestBase {
streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
@Test
+ def testNonPartitionedProcessingTimeBoundedWindow() = {
+
+ val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" +
+ "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " +
+ "FROM MyTable"
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "PROCTIME() AS $2")
+ ),
+ term("orderBy", "PROCTIME"),
+ term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0")
+ ),
+ term("select", "a", "w0$o0 AS $1")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testPartitionedProcessingTimeBoundedWindow() = {
+
+ val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" +
+ "RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
+ "FROM MyTable"
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "PROCTIME() AS $2")
+ ),
+ term("partitionBy","a"),
+ term("orderBy", "PROCTIME"),
+ term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1")
+ ),
+ term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
def testNonPartitionedTumbleWindow() = {
val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(rowtime() TO HOUR)"
val expected =
http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
new file mode 100644
index 0000000..227bfc7
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.Comparator
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.lang.{Integer => JInt, Long => JLong}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
+import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
+import org.apache.flink.table.runtime.aggregate.BoundedProcessingOverRangeProcessFunctionTest._
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class BoundedProcessingOverRangeProcessFunctionTest {
+
+ @Test
+ def testProcTimePartitionedOverRange(): Unit = {
+
+ val rT = new RowTypeInfo(Array[TypeInformation[_]](
+ INT_TYPE_INFO,
+ LONG_TYPE_INFO,
+ INT_TYPE_INFO,
+ STRING_TYPE_INFO,
+ LONG_TYPE_INFO),
+ Array("a", "b", "c", "d", "e"))
+
+ val rTA = new RowTypeInfo(Array[TypeInformation[_]](
+ LONG_TYPE_INFO), Array("count"))
+
+ val processFunction = new KeyedProcessOperator[String, Row, Row](
+ new BoundedProcessingOverRangeProcessFunction(
+ Array(new LongMinWithRetractAggFunction, new LongMaxWithRetractAggFunction),
+ Array(Array(4), Array(4)),
+ 5,
+ rTA,
+ 1000,
+ rT))
+
+ val testHarness = new KeyedOneInputStreamOperatorTestHarness[JInt, Row, Row](
+ processFunction,
+ new TupleRowSelector(0),
+ BasicTypeInfo.INT_TYPE_INFO)
+
+ testHarness.open()
+
+ // Time = 3
+ testHarness.setProcessingTime(3)
+ // key = 1
+ testHarness.processElement(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), 0))
+ // key = 2
+ testHarness.processElement(new StreamRecord(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), 0))
+
+ // Time = 4
+ testHarness.setProcessingTime(4)
+ // key = 1
+ testHarness.processElement(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), 0))
+ testHarness.processElement(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), 0))
+ // key = 2
+ testHarness.processElement(new StreamRecord(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), 0))
+
+ // Time = 5
+ testHarness.setProcessingTime(5)
+ testHarness.processElement(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), 0))
+
+ // Time = 6
+ testHarness.setProcessingTime(6)
+
+ // Time = 1002
+ testHarness.setProcessingTime(1002)
+ // key = 1
+ testHarness.processElement(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), 0))
+ testHarness.processElement(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), 0))
+ // key = 2
+ testHarness.processElement(new StreamRecord(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), 0))
+
+ // Time = 1003
+ testHarness.setProcessingTime(1003)
+ testHarness.processElement(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), 0))
+
+ // Time = 1004
+ testHarness.setProcessingTime(1004)
+ testHarness.processElement(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), 0))
+
+ // Time = 1005
+ testHarness.setProcessingTime(1005)
+ // key = 1
+ testHarness.processElement(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), 0))
+ testHarness.processElement(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), 0))
+ // key = 2
+ testHarness.processElement(new StreamRecord(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), 0))
+
+ testHarness.setProcessingTime(1006)
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ // all elements at the same proc timestamp have the same value
+ expectedOutput.add(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), 4))
+ expectedOutput.add(new StreamRecord(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), 4))
+ expectedOutput.add(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), 5))
+ expectedOutput.add(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), 5))
+ expectedOutput.add(new StreamRecord(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), 5))
+ expectedOutput.add(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), 6))
+ expectedOutput.add(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), 1003))
+ expectedOutput.add(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), 1003))
+ expectedOutput.add(new StreamRecord(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), 1003))
+ expectedOutput.add(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), 1004))
+ expectedOutput.add(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), 1005))
+ expectedOutput.add(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), 1006))
+ expectedOutput.add(new StreamRecord(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), 1006))
+ expectedOutput.add(new StreamRecord(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), 1006))
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.",
+ expectedOutput, result, new RowResultSortComparator(6))
+
+ testHarness.close()
+
+ }
+}
+
+object BoundedProcessingOverRangeProcessFunctionTest {
+
+/**
+ * Return 0 for equal Rows and non zero for different rows
+ */
+class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable {
+
+ override def compare(o1: Object, o2: Object):Int = {
+
+ if (o1.isInstanceOf[Watermark] || o2.isInstanceOf[Watermark]) {
+ // watermark is not expected
+ -1
+ } else {
+ val row1 = o1.asInstanceOf[StreamRecord[Row]].getValue
+ val row2 = o2.asInstanceOf[StreamRecord[Row]].getValue
+ row1.toString.compareTo(row2.toString)
+ }
+ }
+}
+
+/**
+ * Simple test class that returns a specified field as the selector function
+ */
+class TupleRowSelector(
+ private val selectorField:Int) extends KeySelector[Row, Integer] {
+
+ override def getKey(value: Row): Integer = {
+ value.getField(selectorField).asInstanceOf[Integer]
+ }
+}
+
+}