You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/05/26 10:22:02 UTC

[2/2] flink git commit: [FLINK-2074] Fix erroneous emission of Sliding Time PreReducer

[FLINK-2074] Fix erroneous emission of Sliding Time PreReducer

Before this, a sliding time window would keep emitting the last result
because the number of elements per pre-aggregation result was not
correctly reset on eviction.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bc6dbec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bc6dbec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bc6dbec

Branch: refs/heads/master
Commit: 6bc6dbec6878f58500370b2e6912ad5022c5bf78
Parents: 4de2353
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri May 22 16:30:58 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue May 26 10:21:31 2015 +0200

----------------------------------------------------------------------
 .../windowbuffer/SlidingGroupedPreReducer.java  |   1 +
 .../windowbuffer/SlidingTimePreReducer.java     |   1 +
 .../SlidingTimeGroupedPreReducerTest.java       | 168 +++++++++++++++----
 .../windowbuffer/SlidingTimePreReducerTest.java | 116 ++++++++++---
 4 files changed, 230 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
index 0872c6e..09fadf9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
@@ -143,6 +143,7 @@ public abstract class SlidingGroupedPreReducer<T> extends SlidingPreReducer<T> {
 	@Override
 	protected void resetCurrent() {
 		currentReducedMap = null;
+		elementsSinceLastPreAggregate = 0;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
index 7652d81..d84505c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
@@ -89,6 +89,7 @@ public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
 
 		if (toRemove > 0 && lastPreAggregateSize == null) {
 			currentReduced = null;
+			elementsSinceLastPreAggregate = 0;
 			toRemove = 0;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
index 3438f42..18a4748 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
@@ -23,10 +23,14 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
@@ -37,61 +41,136 @@ import org.junit.Test;
 public class SlidingTimeGroupedPreReducerTest {
 
 	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+	TypeInformation<Tuple2<Integer,Integer>> tupleType = TypeInfoParser.parse("Tuple2<Integer,Integer>");
+
 
 	ReduceFunction<Integer> reducer = new SumReducer();
+	ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer();
+
 
 	KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+	KeySelector<Tuple2<Integer, Integer>, ?> tupleKey = new TupleModKey(2);
 
 	@Test
 	public void testPreReduce1() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		// This ensures that the buffer is properly cleared after a burst of elements by
+		// replaying the same sequence of elements with a later timestamp and expecting the same
+		// result.
 
-		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
-				reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(
-						new Timestamp<Integer>() {
+		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
 
-							private static final long serialVersionUID = 1L;
+		SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>>(tupleReducer,
+				tupleType.createSerializer(new ExecutionConfig()), tupleKey, 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
 
-							@Override
-							public long getTimestamp(Integer value) {
-								return value;
-							}
-						}, 1));
+			private static final long serialVersionUID = 1L;
 
-		preReducer.store(1);
-		preReducer.store(2);
+			@Override
+			public long getTimestamp(Tuple2<Integer, Integer> value) {
+				return value.f0;
+			}
+		}, 1));
+
+		int timeOffset = 0;
+
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
 		preReducer.emitWindow(collector);
-		preReducer.store(3);
-		preReducer.store(4);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
 		preReducer.evict(1);
 		preReducer.emitWindow(collector);
 		preReducer.evict(2);
-		preReducer.store(5);
-		preReducer.store(6);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
 		preReducer.emitWindow(collector);
 		preReducer.evict(2);
-		preReducer.store(7);
-		preReducer.store(8);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
 		preReducer.emitWindow(collector);
 		preReducer.evict(2);
-		preReducer.store(9);
-		preReducer.store(10);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
 		preReducer.emitWindow(collector);
 		preReducer.evict(2);
-		preReducer.store(11);
-		preReducer.store(12);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
 		preReducer.emitWindow(collector);
-		preReducer.store(13);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
 
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(1, 2));
-		expected.add(StreamWindow.fromElements(3, 6));
-		expected.add(StreamWindow.fromElements(5, 10));
-		expected.add(StreamWindow.fromElements(7, 14));
-		expected.add(StreamWindow.fromElements(9, 18));
-		expected.add(StreamWindow.fromElements(11, 22));
+		// ensure that everything is cleared out
+		preReducer.evict(100);
 
-		checkResults(expected, collector.getCollected());
+
+		timeOffset = 25; // a little while later...
+
+		// Repeat the same sequence, this should produce the same result
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
+		preReducer.emitWindow(collector);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
+		preReducer.emitWindow(collector);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
+
+		List<StreamWindow<Tuple2<Integer, Integer>>> expected = new ArrayList<StreamWindow<Tuple2<Integer, Integer>>>();
+		timeOffset = 0; // rewind ...
+		expected.add(StreamWindow.fromElements(
+				new Tuple2<Integer, Integer>(timeOffset + 2, 2),
+				new Tuple2<Integer, Integer>(timeOffset + 1, 1)));
+		expected.add(StreamWindow.fromElements(
+				new Tuple2<Integer, Integer>(timeOffset + 2, 6),
+				new Tuple2<Integer, Integer>(timeOffset + 3, 3)));
+		expected.add(StreamWindow.fromElements(
+				new Tuple2<Integer, Integer>(timeOffset + 4, 10),
+				new Tuple2<Integer, Integer>(timeOffset + 5, 5)));
+		expected.add(StreamWindow.fromElements(
+				new Tuple2<Integer, Integer>(timeOffset + 6, 14),
+				new Tuple2<Integer, Integer>(timeOffset + 7, 7)));
+		expected.add(StreamWindow.fromElements(
+				new Tuple2<Integer, Integer>(timeOffset + 8, 18),
+				new Tuple2<Integer, Integer>(timeOffset + 9, 9)));
+		expected.add(StreamWindow.fromElements(
+				new Tuple2<Integer, Integer>(timeOffset + 10, 22),
+				new Tuple2<Integer, Integer>(timeOffset + 11, 11)));
+
+		timeOffset = 25; // and back to the future ...
+		expected.add(StreamWindow.fromElements(
+				new Tuple2<Integer, Integer>(timeOffset + 2, 2),
+				new Tuple2<Integer, Integer>(timeOffset + 1, 1)));
+		expected.add(StreamWindow.fromElements(
+				new Tuple2<Integer, Integer>(timeOffset + 2, 6),
+				new Tuple2<Integer, Integer>(timeOffset + 3, 3)));
+		expected.add(StreamWindow.fromElements(
+				new Tuple2<Integer, Integer>(timeOffset + 4, 10),
+				new Tuple2<Integer, Integer>(timeOffset + 5, 5)));
+		expected.add(StreamWindow.fromElements(
+				new Tuple2<Integer, Integer>(timeOffset + 6, 14),
+				new Tuple2<Integer, Integer>(timeOffset + 7, 7)));
+		expected.add(StreamWindow.fromElements(
+				new Tuple2<Integer, Integer>(timeOffset + 8, 18),
+				new Tuple2<Integer, Integer>(timeOffset + 9, 9)));
+		expected.add(StreamWindow.fromElements(
+				new Tuple2<Integer, Integer>(timeOffset + 10, 22),
+				new Tuple2<Integer, Integer>(timeOffset + 11, 11)));
+
+		assertEquals(expected, collector.getCollected());
 	}
 
 	protected static void checkResults(List<StreamWindow<Integer>> expected,
@@ -277,4 +356,31 @@ public class SlidingTimeGroupedPreReducerTest {
 		}
 
 	}
+
+	private static class TupleSumReducer implements ReduceFunction<Tuple2<Integer, Integer>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
+			return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
+		}
+
+	}
+
+	public static class TupleModKey implements KeySelector<Tuple2<Integer, Integer>, Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private int m;
+
+		public TupleModKey(int m) {
+			this.m = m;
+		}
+
+		@Override
+		public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
+			return value.f1 % m;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
index bc3b13b..a48bc0c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
@@ -22,9 +22,13 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
@@ -34,56 +38,106 @@ import org.junit.Test;
 public class SlidingTimePreReducerTest {
 
 	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+	TypeInformation<Tuple2<Integer,Integer>> tupleType = TypeInfoParser.parse("Tuple2<Integer,Integer>");
 
 	ReduceFunction<Integer> reducer = new SumReducer();
+	ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer();
 
 	@Test
 	public void testPreReduce1() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		// This ensures that the buffer is properly cleared after a burst of elements by
+		// replaying the same sequence of elements with a later timestamp and expecting the same
+		// result.
 
-		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
-				serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+
+		SlidingTimePreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimePreReducer<Tuple2<Integer, Integer>>(tupleReducer,
+				tupleType.createSerializer(new ExecutionConfig()), 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
 
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public long getTimestamp(Integer value) {
-						return value;
+					public long getTimestamp(Tuple2<Integer, Integer> value) {
+						return value.f0;
 					}
 				}, 1));
 
-		preReducer.store(1);
-		preReducer.store(2);
+		int timeOffset = 0;
+
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
 		preReducer.emitWindow(collector);
-		preReducer.store(3);
-		preReducer.store(4);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
 		preReducer.evict(1);
 		preReducer.emitWindow(collector);
 		preReducer.evict(2);
-		preReducer.store(5);
-		preReducer.store(6);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
 		preReducer.emitWindow(collector);
 		preReducer.evict(2);
-		preReducer.store(7);
-		preReducer.store(8);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
 		preReducer.emitWindow(collector);
 		preReducer.evict(2);
-		preReducer.store(9);
-		preReducer.store(10);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
 		preReducer.emitWindow(collector);
 		preReducer.evict(2);
-		preReducer.store(11);
-		preReducer.store(12);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
 		preReducer.emitWindow(collector);
-		preReducer.store(13);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
+
+		// ensure that everything is cleared out
+		preReducer.evict(100);
+
+
+		timeOffset = 25; // a little while later...
+
+		// Repeat the same sequence, this should produce the same result
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
+		preReducer.emitWindow(collector);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
+		preReducer.emitWindow(collector);
+		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
+
+		List<StreamWindow<Tuple2<Integer, Integer>>> expected = new ArrayList<StreamWindow<Tuple2<Integer, Integer>>>();
+		timeOffset = 0; // rewind ...
+		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 1, 3)));
+		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 2, 9)));
+		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 4, 15)));
+		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 6, 21)));
+		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 8, 27)));
+		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 10, 33)));
+
+		timeOffset = 25; // and back to the future ...
+		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 1, 3)));
+		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 2, 9)));
+		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 4, 15)));
+		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 6, 21)));
+		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 8, 27)));
+		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 10, 33)));
 
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(3));
-		expected.add(StreamWindow.fromElements(9));
-		expected.add(StreamWindow.fromElements(15));
-		expected.add(StreamWindow.fromElements(21));
-		expected.add(StreamWindow.fromElements(27));
-		expected.add(StreamWindow.fromElements(33));
 
 		assertEquals(expected, collector.getCollected());
 	}
@@ -254,4 +308,16 @@ public class SlidingTimePreReducerTest {
 		}
 
 	}
+
+	private static class TupleSumReducer implements ReduceFunction<Tuple2<Integer, Integer>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
+			return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
+		}
+
+	}
+
 }