You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/21 10:17:23 UTC

[6/6] flink git commit: [hotfix] Remove "initialTime" in WindowOperatorTest

[hotfix] Remove "initialTime" in WindowOperatorTest

I added this once with the though of varying the start time for window
tests but it just makes stuff harder to parse for now.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09e28109
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09e28109
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09e28109

Branch: refs/heads/master
Commit: 09e28109b3707f23230ea50526b7db8aa166df15
Parents: 87195a8
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jun 20 16:44:44 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 21 12:16:59 2016 +0200

----------------------------------------------------------------------
 .../operators/windowing/WindowOperatorTest.java | 594 +++++++++----------
 1 file changed, 288 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09e28109/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index f606e66..9aaf683 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -77,38 +77,36 @@ public class WindowOperatorTest {
 
 	private void testSlidingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness) throws Exception {
 
-		long initialTime = 0L;
-
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
 
 
-		testHarness.processWatermark(new Watermark(initialTime + 999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 999));
+		testHarness.processWatermark(new Watermark(999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 999));
 		expectedOutput.add(new Watermark(999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 1999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 1999));
+		testHarness.processWatermark(new Watermark(1999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 1999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 1999));
 		expectedOutput.add(new Watermark(1999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
-		testHarness.processWatermark(new Watermark(initialTime + 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
+		testHarness.processWatermark(new Watermark(2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
 		expectedOutput.add(new Watermark(2999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
@@ -119,25 +117,25 @@ public class WindowOperatorTest {
 		testHarness.restore(snapshot, 10L);
 		testHarness.open();
 
-		testHarness.processWatermark(new Watermark(initialTime + 3999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 3999));
+		testHarness.processWatermark(new Watermark(3999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), 3999));
 		expectedOutput.add(new Watermark(3999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
-		testHarness.processWatermark(new Watermark(initialTime + 4999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 4999));
+		testHarness.processWatermark(new Watermark(4999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 4999));
 		expectedOutput.add(new Watermark(4999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
-		testHarness.processWatermark(new Watermark(initialTime + 5999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
+		testHarness.processWatermark(new Watermark(5999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999));
 		expectedOutput.add(new Watermark(5999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 
 		// those don't have any effect...
-		testHarness.processWatermark(new Watermark(initialTime + 6999));
-		testHarness.processWatermark(new Watermark(initialTime + 7999));
+		testHarness.processWatermark(new Watermark(6999));
+		testHarness.processWatermark(new Watermark(7999));
 		expectedOutput.add(new Watermark(6999));
 		expectedOutput.add(new Watermark(7999));
 
@@ -224,30 +222,29 @@ public class WindowOperatorTest {
 	}
 
 	private void testTumblingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness) throws Exception {
-		long initialTime = 0L;
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
 
 		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
 
 
-		testHarness.processWatermark(new Watermark(initialTime + 999));
+		testHarness.processWatermark(new Watermark(999));
 		expectedOutput.add(new Watermark(999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
+		testHarness.processWatermark(new Watermark(1999));
 		expectedOutput.add(new Watermark(1999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
@@ -258,29 +255,29 @@ public class WindowOperatorTest {
 		testHarness.restore(snapshot, 10L);
 		testHarness.open();
 
-		testHarness.processWatermark(new Watermark(initialTime + 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
+		testHarness.processWatermark(new Watermark(2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
 		expectedOutput.add(new Watermark(2999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
-		testHarness.processWatermark(new Watermark(initialTime + 3999));
+		testHarness.processWatermark(new Watermark(3999));
 		expectedOutput.add(new Watermark(3999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
-		testHarness.processWatermark(new Watermark(initialTime + 4999));
+		testHarness.processWatermark(new Watermark(4999));
 		expectedOutput.add(new Watermark(4999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
-		testHarness.processWatermark(new Watermark(initialTime + 5999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
+		testHarness.processWatermark(new Watermark(5999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999));
 		expectedOutput.add(new Watermark(5999));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 
 		// those don't have any effect...
-		testHarness.processWatermark(new Watermark(initialTime + 6999));
-		testHarness.processWatermark(new Watermark(initialTime + 7999));
+		testHarness.processWatermark(new Watermark(6999));
+		testHarness.processWatermark(new Watermark(7999));
 		expectedOutput.add(new Watermark(6999));
 		expectedOutput.add(new Watermark(7999));
 
@@ -392,18 +389,17 @@ public class WindowOperatorTest {
 
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
-		long initialTime = 0L;
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
 
 		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
 
 		// do a snapshot, close and restore again
 		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
@@ -412,28 +408,28 @@ public class WindowOperatorTest {
 		testHarness.restore(snapshot, 10L);
 		testHarness.open();
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 5501));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 6000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 6000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 6050));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
 
-		testHarness.processWatermark(new Watermark(initialTime + 12000));
+		testHarness.processWatermark(new Watermark(12000));
 
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), initialTime + 5499));
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), initialTime + 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
 
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), initialTime + 9049));
-		expectedOutput.add(new Watermark(initialTime + 12000));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
+		expectedOutput.add(new Watermark(12000));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 15000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), initialTime + 15000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
 
-		testHarness.processWatermark(new Watermark(initialTime + 17999));
+		testHarness.processWatermark(new Watermark(17999));
 
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), initialTime + 17999));
-		expectedOutput.add(new Watermark(initialTime + 17999));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
+		expectedOutput.add(new Watermark(17999));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 
@@ -469,15 +465,14 @@ public class WindowOperatorTest {
 
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
-		long initialTime = 0L;
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
 
 		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
 
 		// do a snapshot, close and restore again
 		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
@@ -486,29 +481,29 @@ public class WindowOperatorTest {
 		testHarness.restore(snapshot, 10L);
 		testHarness.open();
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 5501));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 6000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 6000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 6050));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
 
-		testHarness.processWatermark(new Watermark(initialTime + 12000));
+		testHarness.processWatermark(new Watermark(12000));
 
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), initialTime + 5499));
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), initialTime + 5499));
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), initialTime + 9049));
-		expectedOutput.add(new Watermark(initialTime + 12000));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
+		expectedOutput.add(new Watermark(12000));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 15000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), initialTime + 15000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
 
-		testHarness.processWatermark(new Watermark(initialTime + 17999));
+		testHarness.processWatermark(new Watermark(17999));
 
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), initialTime + 17999));
-		expectedOutput.add(new Watermark(initialTime + 17999));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
+		expectedOutput.add(new Watermark(17999));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 
@@ -547,20 +542,19 @@ public class WindowOperatorTest {
 				new OneInputStreamOperatorTestHarness<>(operator);
 
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		long initialTime = 0L;
+		
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
 
 		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2500));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
 
 		// do a snapshot, close and restore again
 		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
@@ -569,21 +563,21 @@ public class WindowOperatorTest {
 		testHarness.restore(snapshot, 10L);
 		testHarness.open();
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 6000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 6500));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));
 
 
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), initialTime + 6499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), 6499));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 
 		// add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), initialTime + 4500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), 4500));
 
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), initialTime + 9999L));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), 9999L));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 
@@ -653,15 +647,14 @@ public class WindowOperatorTest {
 				new OneInputStreamOperatorTestHarness<>(operator);
 
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		long initialTime = 0L;
+		
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
 
 		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), 1000));
 
 		// do a snapshot, close and restore again
 		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
@@ -670,17 +663,17 @@ public class WindowOperatorTest {
 		testHarness.restore(snapshot, 10L);
 		testHarness.open();
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), initialTime + 2500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), 2500));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 33), initialTime + 2500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 33), 2500));
 
-		testHarness.processWatermark(new Watermark(initialTime + 12000));
+		testHarness.processWatermark(new Watermark(12000));
 
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-36", 10L, 4000L), initialTime + 3999));
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-67", 0L, 3000L), initialTime + 2999));
-		expectedOutput.add(new Watermark(initialTime + 12000));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-36", 10L, 4000L), 3999));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-67", 0L, 3000L), 2999));
+		expectedOutput.add(new Watermark(12000));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 
@@ -716,51 +709,50 @@ public class WindowOperatorTest {
 				new OneInputStreamOperatorTestHarness<>(operator);
 
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		long initialTime = 0L;
+		
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
 
 		// The global window actually ignores these timestamps...
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
 
 		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
 
 
-		testHarness.processWatermark(new Watermark(initialTime + 1000));
+		testHarness.processWatermark(new Watermark(1000));
 		expectedOutput.add(new Watermark(1000));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
 
-		testHarness.processWatermark(new Watermark(initialTime + 2000));
+		testHarness.processWatermark(new Watermark(2000));
 		expectedOutput.add(new Watermark(2000));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
-		testHarness.processWatermark(new Watermark(initialTime + 3000));
+		testHarness.processWatermark(new Watermark(3000));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
 		expectedOutput.add(new Watermark(3000));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
-		testHarness.processWatermark(new Watermark(initialTime + 4000));
+		testHarness.processWatermark(new Watermark(4000));
 		expectedOutput.add(new Watermark(4000));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
-		testHarness.processWatermark(new Watermark(initialTime + 5000));
+		testHarness.processWatermark(new Watermark(5000));
 		expectedOutput.add(new Watermark(5000));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
-		testHarness.processWatermark(new Watermark(initialTime + 6000));
+		testHarness.processWatermark(new Watermark(6000));
 
  		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
 
@@ -770,8 +762,8 @@ public class WindowOperatorTest {
 
 
 		// those don't have any effect...
-		testHarness.processWatermark(new Watermark(initialTime + 7000));
-		testHarness.processWatermark(new Watermark(initialTime + 8000));
+		testHarness.processWatermark(new Watermark(7000));
+		testHarness.processWatermark(new Watermark(8000));
 		expectedOutput.add(new Watermark(7000));
 		expectedOutput.add(new Watermark(8000));
 
@@ -810,8 +802,7 @@ public class WindowOperatorTest {
 				new OneInputStreamOperatorTestHarness<>(operator);
 
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		long initialTime = 0L;
+		
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
@@ -819,15 +810,15 @@ public class WindowOperatorTest {
 		// The global window actually ignores these timestamps...
 
 		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
 
 		// do a snapshot, close and restore again
 		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
@@ -836,18 +827,18 @@ public class WindowOperatorTest {
 		testHarness.restore(snapshot, 10L);
 		testHarness.open();
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
 
 
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
 
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
@@ -959,34 +950,33 @@ public class WindowOperatorTest {
 
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
-
-		long initialTime = 0L;
+		
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 500));
-		testHarness.processWatermark(new Watermark(initialTime + 1500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 500));
+		testHarness.processWatermark(new Watermark(1500));
 
-		expected.add(new Watermark(initialTime + 1500));
+		expected.add(new Watermark(1500));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1300));
-		testHarness.processWatermark(new Watermark(initialTime + 2300));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1300));
+		testHarness.processWatermark(new Watermark(2300));
 
-		expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1999));
-		expected.add(new Watermark(initialTime + 2300));
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), 1999));
+		expected.add(new Watermark(2300));
 
 		// this will not be dropped because window.maxTimestamp() + allowedLateness > currentWatermark
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1997));
-		testHarness.processWatermark(new Watermark(initialTime + 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1997));
+		testHarness.processWatermark(new Watermark(6000));
 
 		// this is 1 and not 3 because the trigger fires and purges
-		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		expected.add(new Watermark(initialTime + 6000));
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+		expected.add(new Watermark(6000));
 
 		// this will be dropped because window.maxTimestamp() + allowedLateness < currentWatermark
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processWatermark(new Watermark(initialTime + 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+		testHarness.processWatermark(new Watermark(7000));
 
-		expected.add(new Watermark(initialTime + 7000));
+		expected.add(new Watermark(7000));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
 		testHarness.close();
@@ -1088,34 +1078,33 @@ public class WindowOperatorTest {
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
 
-		long initialTime = 0L;
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 
 		// normal element
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processWatermark(new Watermark(initialTime + 1985));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(1985));
 
-		expected.add(new Watermark(initialTime + 1985));
+		expected.add(new Watermark(1985));
 
 		// this will not be dropped because window.maxTimestamp() + allowedLateness > currentWatermark
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1980));
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1980));
+		testHarness.processWatermark(new Watermark(1999));
 
-		expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1999));
-		expected.add(new Watermark(initialTime + 1999));
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), 1999));
+		expected.add(new Watermark(1999));
 
 		// dropped as late
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001));
-		testHarness.processWatermark(new Watermark(initialTime + 2999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2001));
+		testHarness.processWatermark(new Watermark(2999));
 
-		expected.add(new Watermark(initialTime + 2999));
+		expected.add(new Watermark(2999));
 
-		testHarness.processWatermark(new Watermark(initialTime + 3999));
+		testHarness.processWatermark(new Watermark(3999));
 
-		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-		expected.add(new Watermark(initialTime + 3999));
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+		expected.add(new Watermark(3999));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
 		testHarness.close();
@@ -1151,51 +1140,50 @@ public class WindowOperatorTest {
 
 		operator.setInputType(inputType, new ExecutionConfig());
 		testHarness.open();
-
-		long initialTime = 0L;
+		
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(1999));
 
-		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		expected.add(new Watermark(initialTime + 1999));
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+		expected.add(new Watermark(1999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
-		testHarness.processWatermark(new Watermark(initialTime + 3000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+		testHarness.processWatermark(new Watermark(3000));
 
-		expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 2999));
-		expected.add(new Watermark(initialTime + 3000));
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), 2999));
+		expected.add(new Watermark(3000));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 3001));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 3001));
 
 		// lateness is set to 0 and window_size = 3 sec and slide 1, the following 2 elements (2400)
 		// are assigned to windows ending at 2999, 3999, 4999.
 		// The 2999 is dropped because it is already late (WM = 2999) but the rest are kept.
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2400));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2400));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 3001));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3900));
-		testHarness.processWatermark(new Watermark(initialTime + 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2400));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2400));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 3001));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3900));
+		testHarness.processWatermark(new Watermark(6000));
 
-		expected.add(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 3999));
-		expected.add(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 3999));
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 5), 3999));
+		expected.add(new StreamRecord<>(new Tuple2<>("key1", 2), 3999));
 
-		expected.add(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 4999));
-		expected.add(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 4999));
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 4), 4999));
+		expected.add(new StreamRecord<>(new Tuple2<>("key1", 2), 4999));
 
-		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 5999));
-		expected.add(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 5999));
+		expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 5999));
+		expected.add(new StreamRecord<>(new Tuple2<>("key1", 2), 5999));
 
-		expected.add(new Watermark(initialTime + 6000));
+		expected.add(new Watermark(6000));
 
 		// dropped due to lateness
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 3001));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 3001));
 
-		testHarness.processWatermark(new Watermark(initialTime + 25000));
+		testHarness.processWatermark(new Watermark(25000));
 
-		expected.add(new Watermark(initialTime + 25000));
+		expected.add(new Watermark(25000));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
 		testHarness.close();
@@ -1231,54 +1219,53 @@ public class WindowOperatorTest {
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
-
-		long initialTime = 0L;
+		
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(1999));
 
 		expected.add(new Watermark(1999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
-		testHarness.processWatermark(new Watermark(initialTime + 4998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+		testHarness.processWatermark(new Watermark(4998));
 
 		expected.add(new Watermark(4998));
 
 		// this will not be dropped because the session we're adding two has maxTimestamp
 		// after the current watermark
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
 
 		// new session
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 8500));
-		testHarness.processWatermark(new Watermark(initialTime + 7400));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
+		testHarness.processWatermark(new Watermark(7400));
 
 		expected.add(new Watermark(7400));
 
 		// this will merge the two sessions into one
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
-		testHarness.processWatermark(new Watermark(initialTime + 11501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+		testHarness.processWatermark(new Watermark(11501));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
 		expected.add(new Watermark(11501));
 
 		// new session
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 11600));
-		testHarness.processWatermark(new Watermark(initialTime + 14600));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
+		testHarness.processWatermark(new Watermark(14600));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
 		expected.add(new Watermark(14600));
 
 		// this is dropped as late
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 10000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 14500));
-		testHarness.processWatermark(new Watermark(initialTime + 20000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
+		testHarness.processWatermark(new Watermark(20000));
 
-		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500l, 17500l), 17499));
+		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
 		expected.add(new Watermark(20000));
 
-		testHarness.processWatermark(new Watermark(initialTime + 100000));
+		testHarness.processWatermark(new Watermark(100000));
 
 		expected.add(new Watermark(100000));
 
@@ -1324,54 +1311,53 @@ public class WindowOperatorTest {
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
-
-		long initialTime = 0L;
+		
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(1999));
 
 		expected.add(new Watermark(1999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
-		testHarness.processWatermark(new Watermark(initialTime + 4998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+		testHarness.processWatermark(new Watermark(4998));
 
 		expected.add(new Watermark(4998));
 
 		// this will not be dropped because the session we're adding two has maxTimestamp
 		// after the current watermark
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
 
 		// new session
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 8500));
-		testHarness.processWatermark(new Watermark(initialTime + 7400));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
+		testHarness.processWatermark(new Watermark(7400));
 
 		expected.add(new Watermark(7400));
 
 		// this will merge the two sessions into one
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
-		testHarness.processWatermark(new Watermark(initialTime + 11501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+		testHarness.processWatermark(new Watermark(11501));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
 		expected.add(new Watermark(11501));
 
 		// new session
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 11600));
-		testHarness.processWatermark(new Watermark(initialTime + 14600));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
+		testHarness.processWatermark(new Watermark(14600));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
 		expected.add(new Watermark(14600));
 
 		// this is dropped as late
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 10000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 14500));
-		testHarness.processWatermark(new Watermark(initialTime + 20000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
+		testHarness.processWatermark(new Watermark(20000));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
 		expected.add(new Watermark(20000));
 
-		testHarness.processWatermark(new Watermark(initialTime + 100000));
+		testHarness.processWatermark(new Watermark(100000));
 		expected.add(new Watermark(100000));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
@@ -1415,52 +1401,51 @@ public class WindowOperatorTest {
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
-		long initialTime = 0L;
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
+		
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(1999));
 
 		expected.add(new Watermark(1999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
-		testHarness.processWatermark(new Watermark(initialTime + 4998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+		testHarness.processWatermark(new Watermark(4998));
 
 		expected.add(new Watermark(4998));
 
 		// this will not be dropped because the session we're adding two has maxTimestamp
 		// after the current watermark
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
 
 		// new session
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 8500));
-		testHarness.processWatermark(new Watermark(initialTime + 7400));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
+		testHarness.processWatermark(new Watermark(7400));
 
 		expected.add(new Watermark(7400));
 
 		// this will merge the two sessions into one
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
-		testHarness.processWatermark(new Watermark(initialTime + 11501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+		testHarness.processWatermark(new Watermark(11501));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
 		expected.add(new Watermark(11501));
 
 		// new session
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 11600));
-		testHarness.processWatermark(new Watermark(initialTime + 14600));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
+		testHarness.processWatermark(new Watermark(14600));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
 		expected.add(new Watermark(14600));
 
 		// dropped as late
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 10000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 14500));
-		testHarness.processWatermark(new Watermark(initialTime + 20000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
+		testHarness.processWatermark(new Watermark(20000));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
 		expected.add(new Watermark(20000));
 
-		testHarness.processWatermark(new Watermark(initialTime + 100000));
+		testHarness.processWatermark(new Watermark(100000));
 		expected.add(new Watermark(100000));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
@@ -1502,40 +1487,39 @@ public class WindowOperatorTest {
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
-
-		long initialTime = 0L;
+		
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(1999));
 
 		expected.add(new Watermark(1999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
-		testHarness.processWatermark(new Watermark(initialTime + 4998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+		testHarness.processWatermark(new Watermark(4998));
 
 		expected.add(new Watermark(4998));
 
 		// this will not be dropped because the session we're adding two has maxTimestamp
 		// after the current watermark
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
 
 		// new session
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 8500));
-		testHarness.processWatermark(new Watermark(initialTime + 7400));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
+		testHarness.processWatermark(new Watermark(7400));
 
 		expected.add(new Watermark(7400));
 
 		// this will merge the two sessions into one
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
-		testHarness.processWatermark(new Watermark(initialTime + 11501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+		testHarness.processWatermark(new Watermark(11501));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
 		expected.add(new Watermark(11501));
 
 		// new session
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 11600));
-		testHarness.processWatermark(new Watermark(initialTime + 14600));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
+		testHarness.processWatermark(new Watermark(14600));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
 		expected.add(new Watermark(14600));
@@ -1543,9 +1527,9 @@ public class WindowOperatorTest {
 		// because of the small allowed lateness and because the trigger is accumulating
 		// this will be merged into the session (11600-14600) and therefore will not
 		// be dropped as late
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 10000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 14500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
 
 		// adding ("key2", 1) extended the session to (10000-146000) for which
 		// maxTimestamp <= currentWatermark. Therefore, we immediately get a firing
@@ -1555,12 +1539,12 @@ public class WindowOperatorTest {
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
 
-		testHarness.processWatermark(new Watermark(initialTime + 20000));
+		testHarness.processWatermark(new Watermark(20000));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-3", 10000L, 17500L), 17499));
 		expected.add(new Watermark(20000));
 
-		testHarness.processWatermark(new Watermark(initialTime + 100000));
+		testHarness.processWatermark(new Watermark(100000));
 
 		expected.add(new Watermark(100000));
 
@@ -1600,58 +1584,57 @@ public class WindowOperatorTest {
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
-
-		long initialTime = 0L;
+		
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(1999));
 
 		expected.add(new Watermark(1999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
-		testHarness.processWatermark(new Watermark(initialTime + 4998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+		testHarness.processWatermark(new Watermark(4998));
 
 		expected.add(new Watermark(4998));
 
 		// this will not be dropped because the session we're adding two has maxTimestamp
 		// after the current watermark
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
 
 		// new session
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 8500));
-		testHarness.processWatermark(new Watermark(initialTime + 7400));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
+		testHarness.processWatermark(new Watermark(7400));
 
 		expected.add(new Watermark(7400));
 
 		// this will merge the two sessions into one
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
-		testHarness.processWatermark(new Watermark(initialTime + 11501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+		testHarness.processWatermark(new Watermark(11501));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
 		expected.add(new Watermark(11501));
 
 		// new session
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 11600));
-		testHarness.processWatermark(new Watermark(initialTime + 14600));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
+		testHarness.processWatermark(new Watermark(14600));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
 		expected.add(new Watermark(14600));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 10000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 13000L), 12999));
 
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 14500));
-		testHarness.processWatermark(new Watermark(initialTime + 20000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
+		testHarness.processWatermark(new Watermark(20000));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499));
 		expected.add(new Watermark(20000));
 
-		testHarness.processWatermark(new Watermark(initialTime + 100000));
+		testHarness.processWatermark(new Watermark(100000));
 
 		expected.add(new Watermark(100000));
 
@@ -1691,44 +1674,43 @@ public class WindowOperatorTest {
 
 		testHarness.open();
 
-		long initialTime = 0L;
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+		testHarness.processWatermark(new Watermark(1999));
 
 		expected.add(new Watermark(1999));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2000));
-		testHarness.processWatermark(new Watermark(initialTime + 4998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2000));
+		testHarness.processWatermark(new Watermark(4998));
 
 		expected.add(new Watermark(4998));
 
 		// this will not be dropped because the session we're adding two has maxTimestamp
 		// after the current watermark
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
 
 		// new session
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 8500));
-		testHarness.processWatermark(new Watermark(initialTime + 7400));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 8500));
+		testHarness.processWatermark(new Watermark(7400));
 
 		expected.add(new Watermark(7400));
 
 		// this will merge the two sessions into one
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
-		testHarness.processWatermark(new Watermark(initialTime + 11501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+		testHarness.processWatermark(new Watermark(11501));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-5", 1000L, 11500L), 11499));
 		expected.add(new Watermark(11501));
 
 		// new session
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 11600));
-		testHarness.processWatermark(new Watermark(initialTime + 14600));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 11600));
+		testHarness.processWatermark(new Watermark(14600));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
 		expected.add(new Watermark(14600));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 10000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
 
 		// the maxTimestamp of the merged session is already late,
 		// so we get an immediate firing
@@ -1737,13 +1719,13 @@ public class WindowOperatorTest {
 		ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 14500));
-		testHarness.processWatermark(new Watermark(initialTime + 20000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
+		testHarness.processWatermark(new Watermark(20000));
 
 		expected.add(new StreamRecord<>(new Tuple3<>("key2-8", 1000L, 17500L), 17499));
 		expected.add(new Watermark(20000));
 
-		testHarness.processWatermark(new Watermark(initialTime + 100000));
+		testHarness.processWatermark(new Watermark(100000));
 		expected.add(new Watermark(100000));
 
 		actual = testHarness.getOutput();