You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/21 10:17:23 UTC
[6/6] flink git commit: [hotfix] Remove "initialTime" in
WindowOperatorTest
[hotfix] Remove "initialTime" in WindowOperatorTest
I added this once with the though of varying the start time for window
tests but it just makes stuff harder to parse for now.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09e28109
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09e28109
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09e28109
Branch: refs/heads/master
Commit: 09e28109b3707f23230ea50526b7db8aa166df15
Parents: 87195a8
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jun 20 16:44:44 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 21 12:16:59 2016 +0200
----------------------------------------------------------------------
.../operators/windowing/WindowOperatorTest.java | 594 +++++++++----------
1 file changed, 288 insertions(+), 306 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/09e28109/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index f606e66..9aaf683 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -77,38 +77,36 @@ public class WindowOperatorTest {
private void testSlidingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness) throws Exception {
- long initialTime = 0L;
-
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
// add elements out-of-order
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
- testHarness.processWatermark(new Watermark(initialTime + 999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 999));
+ testHarness.processWatermark(new Watermark(999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 999));
expectedOutput.add(new Watermark(999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 1999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 1999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 1999));
+ testHarness.processWatermark(new Watermark(1999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 1999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 1999));
expectedOutput.add(new Watermark(1999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 2999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
+ testHarness.processWatermark(new Watermark(2999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
expectedOutput.add(new Watermark(2999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
@@ -119,25 +117,25 @@ public class WindowOperatorTest {
testHarness.restore(snapshot, 10L);
testHarness.open();
- testHarness.processWatermark(new Watermark(initialTime + 3999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 3999));
+ testHarness.processWatermark(new Watermark(3999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), 3999));
expectedOutput.add(new Watermark(3999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 4999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 4999));
+ testHarness.processWatermark(new Watermark(4999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 4999));
expectedOutput.add(new Watermark(4999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 5999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
+ testHarness.processWatermark(new Watermark(5999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999));
expectedOutput.add(new Watermark(5999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
// those don't have any effect...
- testHarness.processWatermark(new Watermark(initialTime + 6999));
- testHarness.processWatermark(new Watermark(initialTime + 7999));
+ testHarness.processWatermark(new Watermark(6999));
+ testHarness.processWatermark(new Watermark(7999));
expectedOutput.add(new Watermark(6999));
expectedOutput.add(new Watermark(7999));
@@ -224,30 +222,29 @@ public class WindowOperatorTest {
}
private void testTumblingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness) throws Exception {
- long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
- testHarness.processWatermark(new Watermark(initialTime + 999));
+ testHarness.processWatermark(new Watermark(999));
expectedOutput.add(new Watermark(999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 1999));
+ testHarness.processWatermark(new Watermark(1999));
expectedOutput.add(new Watermark(1999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
@@ -258,29 +255,29 @@ public class WindowOperatorTest {
testHarness.restore(snapshot, 10L);
testHarness.open();
- testHarness.processWatermark(new Watermark(initialTime + 2999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
+ testHarness.processWatermark(new Watermark(2999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
expectedOutput.add(new Watermark(2999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 3999));
+ testHarness.processWatermark(new Watermark(3999));
expectedOutput.add(new Watermark(3999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 4999));
+ testHarness.processWatermark(new Watermark(4999));
expectedOutput.add(new Watermark(4999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 5999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
+ testHarness.processWatermark(new Watermark(5999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999));
expectedOutput.add(new Watermark(5999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
// those don't have any effect...
- testHarness.processWatermark(new Watermark(initialTime + 6999));
- testHarness.processWatermark(new Watermark(initialTime + 7999));
+ testHarness.processWatermark(new Watermark(6999));
+ testHarness.processWatermark(new Watermark(7999));
expectedOutput.add(new Watermark(6999));
expectedOutput.add(new Watermark(7999));
@@ -392,18 +389,17 @@ public class WindowOperatorTest {
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
- long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
// do a snapshot, close and restore again
StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
@@ -412,28 +408,28 @@ public class WindowOperatorTest {
testHarness.restore(snapshot, 10L);
testHarness.open();
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 5501));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 6000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 6000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 6050));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
- testHarness.processWatermark(new Watermark(initialTime + 12000));
+ testHarness.processWatermark(new Watermark(12000));
- expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), initialTime + 5499));
- expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), initialTime + 5499));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
- expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), initialTime + 9049));
- expectedOutput.add(new Watermark(initialTime + 12000));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
+ expectedOutput.add(new Watermark(12000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 15000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), initialTime + 15000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
- testHarness.processWatermark(new Watermark(initialTime + 17999));
+ testHarness.processWatermark(new Watermark(17999));
- expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), initialTime + 17999));
- expectedOutput.add(new Watermark(initialTime + 17999));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
+ expectedOutput.add(new Watermark(17999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
@@ -469,15 +465,14 @@ public class WindowOperatorTest {
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
- long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
// do a snapshot, close and restore again
StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
@@ -486,29 +481,29 @@ public class WindowOperatorTest {
testHarness.restore(snapshot, 10L);
testHarness.open();
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 5501));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 6000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 6000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 6050));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
- testHarness.processWatermark(new Watermark(initialTime + 12000));
+ testHarness.processWatermark(new Watermark(12000));
- expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), initialTime + 5499));
- expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), initialTime + 5499));
- expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), initialTime + 9049));
- expectedOutput.add(new Watermark(initialTime + 12000));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
+ expectedOutput.add(new Watermark(12000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 15000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), initialTime + 15000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
- testHarness.processWatermark(new Watermark(initialTime + 17999));
+ testHarness.processWatermark(new Watermark(17999));
- expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), initialTime + 17999));
- expectedOutput.add(new Watermark(initialTime + 17999));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
+ expectedOutput.add(new Watermark(17999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
@@ -547,20 +542,19 @@ public class WindowOperatorTest {
new OneInputStreamOperatorTestHarness<>(operator);
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
- long initialTime = 0L;
+
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2500));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
// do a snapshot, close and restore again
StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
@@ -569,21 +563,21 @@ public class WindowOperatorTest {
testHarness.restore(snapshot, 10L);
testHarness.open();
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 6000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 6500));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 7000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));
- expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), initialTime + 6499));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), 6499));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
// add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), initialTime + 4500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), 4500));
- expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), initialTime + 9999L));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), 9999L));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
@@ -653,15 +647,14 @@ public class WindowOperatorTest {
new OneInputStreamOperatorTestHarness<>(operator);
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
- long initialTime = 0L;
+
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// add elements out-of-order
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), initialTime + 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), 1000));
// do a snapshot, close and restore again
StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
@@ -670,17 +663,17 @@ public class WindowOperatorTest {
testHarness.restore(snapshot, 10L);
testHarness.open();
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), initialTime + 2500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), 2500));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 33), initialTime + 2500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 33), 2500));
- testHarness.processWatermark(new Watermark(initialTime + 12000));
+ testHarness.processWatermark(new Watermark(12000));
- expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-36", 10L, 4000L), initialTime + 3999));
- expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-67", 0L, 3000L), initialTime + 2999));
- expectedOutput.add(new Watermark(initialTime + 12000));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-36", 10L, 4000L), 3999));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-67", 0L, 3000L), 2999));
+ expectedOutput.add(new Watermark(12000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
@@ -716,51 +709,50 @@ public class WindowOperatorTest {
new OneInputStreamOperatorTestHarness<>(operator);
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
- long initialTime = 0L;
+
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// The global window actually ignores these timestamps...
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
// add elements out-of-order
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
- testHarness.processWatermark(new Watermark(initialTime + 1000));
+ testHarness.processWatermark(new Watermark(1000));
expectedOutput.add(new Watermark(1000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 2000));
+ testHarness.processWatermark(new Watermark(2000));
expectedOutput.add(new Watermark(2000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 3000));
+ testHarness.processWatermark(new Watermark(3000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
expectedOutput.add(new Watermark(3000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 4000));
+ testHarness.processWatermark(new Watermark(4000));
expectedOutput.add(new Watermark(4000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 5000));
+ testHarness.processWatermark(new Watermark(5000));
expectedOutput.add(new Watermark(5000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 6000));
+ testHarness.processWatermark(new Watermark(6000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
@@ -770,8 +762,8 @@ public class WindowOperatorTest {
// those don't have any effect...
- testHarness.processWatermark(new Watermark(initialTime + 7000));
- testHarness.processWatermark(new Watermark(initialTime + 8000));
+ testHarness.processWatermark(new Watermark(7000));
+ testHarness.processWatermark(new Watermark(8000));
expectedOutput.add(new Watermark(7000));
expectedOutput.add(new Watermark(8000));
@@ -810,8 +802,7 @@ public class WindowOperatorTest {
new OneInputStreamOperatorTestHarness<>(operator);
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
- long initialTime = 0L;
+
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
@@ -819,15 +810,15 @@ public class WindowOperatorTest {
// The global window actually ignores these timestamps...
// add elements out-of-order
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
// do a snapshot, close and restore again
StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
@@ -836,18 +827,18 @@ public class WindowOperatorTest {
testHarness.restore(snapshot, 10L);
testHarness.open();
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
@@ -959,34 +950,33 @@ public class WindowOperatorTest {
operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();
-
- long initialTime = 0L;
+
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 500));
- testHarness.processWatermark(new Watermark(initialTime + 1500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 500));
+ testHarness.processWatermark(new Watermark(1500));
- expected.add(new Watermark(initialTime + 1500));
+ expected.add(new Watermark(1500));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1300));
- testHarness.processWatermark(new Watermark(initialTime + 2300));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1300));
+ testHarness.processWatermark(new Watermark(2300));
- expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1999));
- expected.add(new Watermark(initialTime + 2300));
+ expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), 1999));
+ expected.add(new Watermark(2300));
// this will not be dropped because window.maxTimestamp() + allowedLateness > currentWatermark
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1997));
- testHarness.processWatermark(new Watermark(initialTime + 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1997));
+ testHarness.processWatermark(new Watermark(6000));
// this is 1 and not 3 because the trigger fires and purges
- expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
- expected.add(new Watermark(initialTime + 6000));
+ expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+ expected.add(new Watermark(6000));
// this will be dropped because window.maxTimestamp() + allowedLateness < currentWatermark
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
- testHarness.processWatermark(new Watermark(initialTime + 7000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+ testHarness.processWatermark(new Watermark(7000));
- expected.add(new Watermark(initialTime + 7000));
+ expected.add(new Watermark(7000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.close();
@@ -1088,34 +1078,33 @@ public class WindowOperatorTest {
operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();
- long initialTime = 0L;
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
// normal element
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
- testHarness.processWatermark(new Watermark(initialTime + 1985));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+ testHarness.processWatermark(new Watermark(1985));
- expected.add(new Watermark(initialTime + 1985));
+ expected.add(new Watermark(1985));
// this will not be dropped because window.maxTimestamp() + allowedLateness > currentWatermark
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1980));
- testHarness.processWatermark(new Watermark(initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1980));
+ testHarness.processWatermark(new Watermark(1999));
- expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1999));
- expected.add(new Watermark(initialTime + 1999));
+ expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), 1999));
+ expected.add(new Watermark(1999));
// dropped as late
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001));
- testHarness.processWatermark(new Watermark(initialTime + 2999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2001));
+ testHarness.processWatermark(new Watermark(2999));
- expected.add(new Watermark(initialTime + 2999));
+ expected.add(new Watermark(2999));
- testHarness.processWatermark(new Watermark(initialTime + 3999));
+ testHarness.processWatermark(new Watermark(3999));
- expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
- expected.add(new Watermark(initialTime + 3999));
+ expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+ expected.add(new Watermark(3999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.close();
@@ -1151,51 +1140,50 @@ public class WindowOperatorTest {
operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();
-
- long initialTime = 0L;
+
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
- testHarness.processWatermark(new Watermark(initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+ testHarness.processWatermark(new Watermark(1999));
- expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
- expected.add(new Watermark(initialTime + 1999));
+ expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+ expected.add(new Watermark(1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
- testHarness.processWatermark(new Watermark(initialTime + 3000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+ testHarness.processWatermark(new Watermark(3000));
- expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 2999));
- expected.add(new Watermark(initialTime + 3000));
+ expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), 2999));
+ expected.add(new Watermark(3000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 3001));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 3001));
// lateness is set to 0 and window_size = 3 sec and slide 1, the following 2 elements (2400)
// are assigned to windows ending at 2999, 3999, 4999.
// The 2999 is dropped because it is already late (WM = 2999) but the rest are kept.
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2400));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2400));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 3001));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3900));
- testHarness.processWatermark(new Watermark(initialTime + 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2400));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2400));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 3001));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3900));
+ testHarness.processWatermark(new Watermark(6000));
- expected.add(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 3999));
- expected.add(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 3999));
+ expected.add(new StreamRecord<>(new Tuple2<>("key2", 5), 3999));
+ expected.add(new StreamRecord<>(new Tuple2<>("key1", 2), 3999));
- expected.add(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 4999));
- expected.add(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 4999));
+ expected.add(new StreamRecord<>(new Tuple2<>("key2", 4), 4999));
+ expected.add(new StreamRecord<>(new Tuple2<>("key1", 2), 4999));
- expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 5999));
- expected.add(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 5999));
+ expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 5999));
+ expected.add(new StreamRecord<>(new Tuple2<>("key1", 2), 5999));
- expected.add(new Watermark(initialTime + 6000));
+ expected.add(new Watermark(6000));
// dropped due to lateness
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 3001));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 3001));
- testHarness.processWatermark(new Watermark(initialTime + 25000));
+ testHarness.processWatermark(new Watermark(25000));
- expected.add(new Watermark(initialTime + 25000));
+ expected.add(new Watermark(25000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.close();
@@ -1231,54 +1219,53 @@ public class WindowOperatorTest {
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
-
- long initialTime = 0L;
+
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
- testHarness.processWatermark(new Watermark(initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+ testHarness.processWatermark(new Watermark(1999));
expected.add(new Watermark(1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
- testHarness.processWatermark(new Watermark(initialTime + 4998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+ testHarness.processWatermark(new Watermark(4998));
expected.add(new Watermark(4998));
// this will not be dropped because the session we're adding two has maxTimestamp
// after the current watermark
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
// new session
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 8500));
- testHarness.processWatermark(new Watermark(initialTime + 7400));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
+ testHarness.processWatermark(new Watermark(7400));
expected.add(new Watermark(7400));
// this will merge the two sessions into one
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
- testHarness.processWatermark(new Watermark(initialTime + 11501));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+ testHarness.processWatermark(new Watermark(11501));
expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
expected.add(new Watermark(11501));
// new session
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 11600));
- testHarness.processWatermark(new Watermark(initialTime + 14600));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
+ testHarness.processWatermark(new Watermark(14600));
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
expected.add(new Watermark(14600));
// this is dropped as late
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 10000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 14500));
- testHarness.processWatermark(new Watermark(initialTime + 20000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
+ testHarness.processWatermark(new Watermark(20000));
- expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500l, 17500l), 17499));
+ expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
expected.add(new Watermark(20000));
- testHarness.processWatermark(new Watermark(initialTime + 100000));
+ testHarness.processWatermark(new Watermark(100000));
expected.add(new Watermark(100000));
@@ -1324,54 +1311,53 @@ public class WindowOperatorTest {
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
-
- long initialTime = 0L;
+
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
- testHarness.processWatermark(new Watermark(initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+ testHarness.processWatermark(new Watermark(1999));
expected.add(new Watermark(1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
- testHarness.processWatermark(new Watermark(initialTime + 4998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+ testHarness.processWatermark(new Watermark(4998));
expected.add(new Watermark(4998));
// this will not be dropped because the session we're adding two has maxTimestamp
// after the current watermark
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
// new session
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 8500));
- testHarness.processWatermark(new Watermark(initialTime + 7400));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
+ testHarness.processWatermark(new Watermark(7400));
expected.add(new Watermark(7400));
// this will merge the two sessions into one
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
- testHarness.processWatermark(new Watermark(initialTime + 11501));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+ testHarness.processWatermark(new Watermark(11501));
expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
expected.add(new Watermark(11501));
// new session
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 11600));
- testHarness.processWatermark(new Watermark(initialTime + 14600));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
+ testHarness.processWatermark(new Watermark(14600));
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
expected.add(new Watermark(14600));
// this is dropped as late
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 10000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 14500));
- testHarness.processWatermark(new Watermark(initialTime + 20000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
+ testHarness.processWatermark(new Watermark(20000));
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
expected.add(new Watermark(20000));
- testHarness.processWatermark(new Watermark(initialTime + 100000));
+ testHarness.processWatermark(new Watermark(100000));
expected.add(new Watermark(100000));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
@@ -1415,52 +1401,51 @@ public class WindowOperatorTest {
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
- long initialTime = 0L;
-
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
- testHarness.processWatermark(new Watermark(initialTime + 1999));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+ testHarness.processWatermark(new Watermark(1999));
expected.add(new Watermark(1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
- testHarness.processWatermark(new Watermark(initialTime + 4998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+ testHarness.processWatermark(new Watermark(4998));
expected.add(new Watermark(4998));
// this will not be dropped because the session we're adding two has maxTimestamp
// after the current watermark
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
// new session
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 8500));
- testHarness.processWatermark(new Watermark(initialTime + 7400));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
+ testHarness.processWatermark(new Watermark(7400));
expected.add(new Watermark(7400));
// this will merge the two sessions into one
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
- testHarness.processWatermark(new Watermark(initialTime + 11501));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+ testHarness.processWatermark(new Watermark(11501));
expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
expected.add(new Watermark(11501));
// new session
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 11600));
- testHarness.processWatermark(new Watermark(initialTime + 14600));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
+ testHarness.processWatermark(new Watermark(14600));
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
expected.add(new Watermark(14600));
// dropped as late
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 10000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 14500));
- testHarness.processWatermark(new Watermark(initialTime + 20000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
+ testHarness.processWatermark(new Watermark(20000));
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
expected.add(new Watermark(20000));
- testHarness.processWatermark(new Watermark(initialTime + 100000));
+ testHarness.processWatermark(new Watermark(100000));
expected.add(new Watermark(100000));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
@@ -1502,40 +1487,39 @@ public class WindowOperatorTest {
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
-
- long initialTime = 0L;
+
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
- testHarness.processWatermark(new Watermark(initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+ testHarness.processWatermark(new Watermark(1999));
expected.add(new Watermark(1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
- testHarness.processWatermark(new Watermark(initialTime + 4998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+ testHarness.processWatermark(new Watermark(4998));
expected.add(new Watermark(4998));
// this will not be dropped because the session we're adding two has maxTimestamp
// after the current watermark
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
// new session
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 8500));
- testHarness.processWatermark(new Watermark(initialTime + 7400));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
+ testHarness.processWatermark(new Watermark(7400));
expected.add(new Watermark(7400));
// this will merge the two sessions into one
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
- testHarness.processWatermark(new Watermark(initialTime + 11501));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+ testHarness.processWatermark(new Watermark(11501));
expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
expected.add(new Watermark(11501));
// new session
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 11600));
- testHarness.processWatermark(new Watermark(initialTime + 14600));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
+ testHarness.processWatermark(new Watermark(14600));
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
expected.add(new Watermark(14600));
@@ -1543,9 +1527,9 @@ public class WindowOperatorTest {
// because of the small allowed lateness and because the trigger is accumulating
// this will be merged into the session (11600-14600) and therefore will not
// be dropped as late
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 10000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 14500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
// adding ("key2", 1) extended the session to (10000-146000) for which
// maxTimestamp <= currentWatermark. Therefore, we immediately get a firing
@@ -1555,12 +1539,12 @@ public class WindowOperatorTest {
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
- testHarness.processWatermark(new Watermark(initialTime + 20000));
+ testHarness.processWatermark(new Watermark(20000));
expected.add(new StreamRecord<>(new Tuple3<>("key2-3", 10000L, 17500L), 17499));
expected.add(new Watermark(20000));
- testHarness.processWatermark(new Watermark(initialTime + 100000));
+ testHarness.processWatermark(new Watermark(100000));
expected.add(new Watermark(100000));
@@ -1600,58 +1584,57 @@ public class WindowOperatorTest {
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
-
- long initialTime = 0L;
+
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
- testHarness.processWatermark(new Watermark(initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+ testHarness.processWatermark(new Watermark(1999));
expected.add(new Watermark(1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
- testHarness.processWatermark(new Watermark(initialTime + 4998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+ testHarness.processWatermark(new Watermark(4998));
expected.add(new Watermark(4998));
// this will not be dropped because the session we're adding two has maxTimestamp
// after the current watermark
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
// new session
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 8500));
- testHarness.processWatermark(new Watermark(initialTime + 7400));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
+ testHarness.processWatermark(new Watermark(7400));
expected.add(new Watermark(7400));
// this will merge the two sessions into one
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
- testHarness.processWatermark(new Watermark(initialTime + 11501));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+ testHarness.processWatermark(new Watermark(11501));
expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
expected.add(new Watermark(11501));
// new session
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 11600));
- testHarness.processWatermark(new Watermark(initialTime + 14600));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
+ testHarness.processWatermark(new Watermark(14600));
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
expected.add(new Watermark(14600));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 10000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 13000L), 12999));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 14500));
- testHarness.processWatermark(new Watermark(initialTime + 20000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
+ testHarness.processWatermark(new Watermark(20000));
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
expected.add(new Watermark(20000));
- testHarness.processWatermark(new Watermark(initialTime + 100000));
+ testHarness.processWatermark(new Watermark(100000));
expected.add(new Watermark(100000));
@@ -1691,44 +1674,43 @@ public class WindowOperatorTest {
testHarness.open();
- long initialTime = 0L;
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
- testHarness.processWatermark(new Watermark(initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+ testHarness.processWatermark(new Watermark(1999));
expected.add(new Watermark(1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
- testHarness.processWatermark(new Watermark(initialTime + 4998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+ testHarness.processWatermark(new Watermark(4998));
expected.add(new Watermark(4998));
// this will not be dropped because the session we're adding two has maxTimestamp
// after the current watermark
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4500));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
// new session
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 8500));
- testHarness.processWatermark(new Watermark(initialTime + 7400));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
+ testHarness.processWatermark(new Watermark(7400));
expected.add(new Watermark(7400));
// this will merge the two sessions into one
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
- testHarness.processWatermark(new Watermark(initialTime + 11501));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+ testHarness.processWatermark(new Watermark(11501));
expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
expected.add(new Watermark(11501));
// new session
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 11600));
- testHarness.processWatermark(new Watermark(initialTime + 14600));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
+ testHarness.processWatermark(new Watermark(14600));
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
expected.add(new Watermark(14600));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 10000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
// the maxTimestamp of the merged session is already late,
// so we get an immediate firing
@@ -1737,13 +1719,13 @@ public class WindowOperatorTest {
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 14500));
- testHarness.processWatermark(new Watermark(initialTime + 20000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
+ testHarness.processWatermark(new Watermark(20000));
expected.add(new StreamRecord<>(new Tuple3<>("key2-8", 1000L, 17500L), 17499));
expected.add(new Watermark(20000));
- testHarness.processWatermark(new Watermark(initialTime + 100000));
+ testHarness.processWatermark(new Watermark(100000));
expected.add(new Watermark(100000));
actual = testHarness.getOutput();