You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/11 01:23:15 UTC

[5/5] flink git commit: [FLINK-1619] [FLINK-1620] Grouped sliding prereducers added for Time and Count

[FLINK-1619] [FLINK-1620] Grouped sliding prereducers added for Time and Count

Closes #465


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2522f028
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2522f028
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2522f028

Branch: refs/heads/master
Commit: 2522f028bde9af57ba52904855265e6a8519e724
Parents: aacd4f2
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Mar 9 13:43:32 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Mar 11 01:22:10 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/DataStreamSource.java        |   3 +
 .../api/datastream/WindowedDataStream.java      |  50 ++--
 .../streaming/api/windowing/WindowUtils.java    |   7 +-
 .../SlidingCountGroupedPreReducer.java          |  86 ++++++
 .../windowbuffer/SlidingGroupedPreReducer.java  | 148 ++++++++++
 .../windowbuffer/SlidingPreReducer.java         |  31 +-
 .../SlidingTimeGroupedPreReducer.java           | 100 +++++++
 .../windowing/WindowIntegrationTest.java        | 155 +++++-----
 .../SlidingCountGroupedPreReducerTest.java      | 220 +++++++++++++++
 .../SlidingTimeGroupedPreReducerTest.java       | 280 +++++++++++++++++++
 10 files changed, 979 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index b596cbd..0dda976 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -35,6 +35,9 @@ public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataS
 			TypeInformation<OUT> outTypeInfo, StreamInvokable<?, ?> invokable, boolean isParallel) {
 		super(environment, operatorType, outTypeInfo, invokable);
 		this.isParallel = isParallel;
+		if (!isParallel) {
+			setParallelism(1);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 6ce9f9f..8199b22 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -56,7 +56,9 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.CompletePreAggregator;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
@@ -337,8 +339,10 @@ public class WindowedDataStream<OUT> {
 	}
 
 	private int getDiscretizerParallelism() {
-		return isLocal || WindowUtils.isParallelPolicy(getTrigger(), getEviction())
-				|| (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
+		return isLocal
+				|| WindowUtils.isParallelPolicy(getTrigger(), getEviction(),
+						dataStream.getParallelism()) || (discretizerKey != null) ? dataStream.environment
+				.getDegreeOfParallelism() : 1;
 	}
 
 	private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {
@@ -381,21 +385,35 @@ public class WindowedDataStream<OUT> {
 							(ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
 									.createSerializer(getExecutionConfig()));
 				}
-			} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction) && groupByKey == null) {
-
-				return new SlidingCountPreReducer<OUT>(
-						clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream.getType()
-								.createSerializer(getExecutionConfig()),
-						WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
-						((CountTriggerPolicy<?>) trigger).getStart());
-
-			} else if (WindowUtils.isSlidingTimePolicy(trigger, eviction) && groupByKey == null) {
+			} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
+				if (groupByKey == null) {
+					return new SlidingCountPreReducer<OUT>(
+							clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+									.getType().createSerializer(getExecutionConfig()),
+							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+							((CountTriggerPolicy<?>) trigger).getStart());
+				} else {
+					return new SlidingCountGroupedPreReducer<OUT>(
+							clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+									.getType().createSerializer(getExecutionConfig()), groupByKey,
+							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+							((CountTriggerPolicy<?>) trigger).getStart());
+				}
 
-				return new SlidingTimePreReducer<OUT>(
-						(ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
-								.createSerializer(getExecutionConfig()),
-						WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
-						WindowUtils.getTimeStampWrapper(trigger));
+			} else if (WindowUtils.isSlidingTimePolicy(trigger, eviction)) {
+				if (groupByKey == null) {
+					return new SlidingTimePreReducer<OUT>(
+							(ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+									.createSerializer(getExecutionConfig()),
+							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+							WindowUtils.getTimeStampWrapper(trigger));
+				} else {
+					return new SlidingTimeGroupedPreReducer<OUT>(
+							(ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+									.createSerializer(getExecutionConfig()), groupByKey,
+							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+							WindowUtils.getTimeStampWrapper(trigger));
+				}
 
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
index 246aff2..0649b4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -44,9 +44,10 @@ public class WindowUtils {
 		}
 	}
 
-	public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
-		return (eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy))
-				|| (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy);
+	public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction,
+			int inputParallelism) {
+		return inputParallelism != 1
+				&& ((eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy)) || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy));
 	}
 
 	public static boolean isSlidingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
new file mode 100644
index 0000000..48bf1b6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingCountGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private long windowSize;
+	private long slideSize;
+	private int start;
+
+	protected long index = 0;
+
+	public SlidingCountGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+			KeySelector<T, ?> key, long windowSize, long slideSize, int start) {
+		super(reducer, serializer, key);
+		if (windowSize > slideSize) {
+			this.windowSize = windowSize;
+			this.slideSize = slideSize;
+			this.start = start;
+		} else {
+			throw new RuntimeException(
+					"Window size needs to be larger than slide size for the sliding pre-reducer");
+		}
+		index = index - start;
+	}
+
+	@Override
+	protected void afterStore() {
+		index++;
+	}
+
+	@Override
+	public void store(T element) throws Exception {
+		if (index >= 0) {
+			super.store(element);
+		} else {
+			index++;
+		}
+	}
+
+	@Override
+	protected boolean currentEligible(T next) {
+		if (index <= slideSize) {
+			return true;
+		} else {
+			return index == windowSize;
+		}
+	}
+
+	@Override
+	protected void afterEmit() {
+		if (index >= slideSize) {
+			index = index - slideSize;
+		}
+	}
+
+	@Override
+	public SlidingCountGroupedPreReducer<T> clone() {
+		return new SlidingCountGroupedPreReducer<T>(reducer, serializer, key, windowSize,
+				slideSize, start);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
new file mode 100644
index 0000000..aa1d76c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+public abstract class SlidingGroupedPreReducer<T> extends SlidingPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	protected Map<Object, T> currentReducedMap = new HashMap<Object, T>();
+	protected LinkedList<Map<Object, T>> reducedMap = new LinkedList<Map<Object, T>>();
+
+	protected KeySelector<T, ?> key;
+
+	public SlidingGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+			KeySelector<T, ?> key) {
+		super(reducer, serializer);
+		this.key = key;
+	}
+
+	public boolean addFinalAggregate(StreamWindow<T> currentWindow) throws Exception {
+		Map<Object, T> finalReduce = null;
+
+		if (!reducedMap.isEmpty()) {
+			finalReduce = reducedMap.get(0);
+			for (int i = 1; i < reducedMap.size(); i++) {
+				finalReduce = reduceMaps(finalReduce, reducedMap.get(i));
+
+			}
+			if (currentReducedMap != null) {
+				finalReduce = reduceMaps(finalReduce, currentReducedMap);
+			}
+
+		} else {
+			finalReduce = currentReducedMap;
+		}
+
+		if (finalReduce != null) {
+			currentWindow.addAll(finalReduce.values());
+			return true;
+		} else {
+			return false;
+		}
+
+	}
+
+	private Map<Object, T> reduceMaps(Map<Object, T> first, Map<Object, T> second) throws Exception {
+
+		Map<Object, T> reduced = new HashMap<Object, T>();
+
+		// Get the common keys in the maps
+		Set<Object> interSection = new HashSet<Object>();
+		Set<Object> diffFirst = new HashSet<Object>();
+		Set<Object> diffSecond = new HashSet<Object>();
+
+		for (Object key : first.keySet()) {
+			if (second.containsKey(key)) {
+				interSection.add(key);
+			} else {
+				diffFirst.add(key);
+			}
+		}
+
+		for (Object key : second.keySet()) {
+			if (!interSection.contains(key)) {
+				diffSecond.add(key);
+			}
+		}
+
+		// Reduce the common keys
+		for (Object key : interSection) {
+			reduced.put(
+					key,
+					reducer.reduce(serializer.copy(first.get(key)),
+							serializer.copy(second.get(key))));
+		}
+
+		for (Object key : diffFirst) {
+			reduced.put(key, first.get(key));
+		}
+
+		for (Object key : diffSecond) {
+			reduced.put(key, second.get(key));
+		}
+
+		return reduced;
+	}
+
+	protected void updateCurrent(T element) throws Exception {
+		if (currentReducedMap == null) {
+			currentReducedMap = new HashMap<Object, T>();
+			currentReducedMap.put(key.getKey(element), element);
+		} else {
+			Object nextKey = key.getKey(element);
+			T last = currentReducedMap.get(nextKey);
+			if (last == null) {
+				currentReducedMap.put(nextKey, element);
+			} else {
+				currentReducedMap.put(nextKey, reducer.reduce(serializer.copy(last), element));
+			}
+		}
+	}
+
+	@Override
+	protected void removeLastReduced() {
+		reducedMap.removeFirst();
+	}
+
+	@Override
+	protected void addCurrentToBuffer(T element) throws Exception {
+		reducedMap.add(currentReducedMap);
+	}
+
+	@Override
+	protected void resetCurrent() {
+		currentReducedMap = null;
+	}
+
+	@Override
+	protected boolean currentNotEmpty() {
+		return currentReducedMap != null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
index 1dd126d..27f7ff5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -103,15 +103,23 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
 	}
 
 	protected void addToBufferIfEligible(T element) throws Exception {
-		if (currentEligible(element) && currentReduced != null) {
+		if (currentEligible(element) && currentNotEmpty()) {
 			addCurrentToBuffer(element);
 			elementsPerPreAggregate.add(elementsSinceLastPreAggregate);
-			elementsSinceLastPreAggregate = 1;
-		} else {
-			updateCurrent(element);
-
-			elementsSinceLastPreAggregate++;
+			elementsSinceLastPreAggregate = 0;
+			resetCurrent();
 		}
+		updateCurrent(element);
+
+		elementsSinceLastPreAggregate++;
+	}
+
+	protected void resetCurrent() {
+		currentReduced = null;
+	}
+
+	protected boolean currentNotEmpty() {
+		return currentReduced != null;
 	}
 
 	protected void updateCurrent(T element) throws Exception {
@@ -122,9 +130,8 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
 		}
 	}
 
-	protected void addCurrentToBuffer(T element) {
+	protected void addCurrentToBuffer(T element) throws Exception {
 		reduced.add(currentReduced);
-		currentReduced = element;
 	}
 
 	protected abstract boolean currentEligible(T next);
@@ -135,7 +142,7 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
 		Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
 		while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
 			toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
-			reduced.removeFirst();
+			removeLastReduced();
 			lastPreAggregateSize = elementsPerPreAggregate.peek();
 		}
 
@@ -144,7 +151,11 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
 		}
 	}
 
-	public int max(int a, int b) {
+	protected void removeLastReduced() {
+		reduced.removeFirst();
+	}
+
+	public static int max(int a, int b) {
 		if (a > b) {
 			return a;
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
new file mode 100644
index 0000000..1c293af
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingTimeGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private long windowSize;
+	private long slideSize;
+	private TimestampWrapper<T> timestampWrapper;
+	private T lastStored;
+	protected long windowStartTime;
+
+	public SlidingTimeGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+			KeySelector<T, ?> key, long windowSize, long slideSize,
+			TimestampWrapper<T> timestampWrapper) {
+		super(reducer, serializer, key);
+		if (windowSize > slideSize) {
+			this.windowSize = windowSize;
+			this.slideSize = slideSize;
+		} else {
+			throw new RuntimeException(
+					"Window size needs to be larger than slide size for the sliding pre-reducer");
+		}
+		this.timestampWrapper = timestampWrapper;
+		this.windowStartTime = timestampWrapper.getStartTime();
+	}
+
+	@Override
+	public void store(T element) throws Exception {
+		super.store(element);
+		lastStored = element;
+	}
+
+	@Override
+	public SlidingTimeGroupedPreReducer<T> clone() {
+		return new SlidingTimeGroupedPreReducer<T>(reducer, serializer, key, windowSize, slideSize,
+				timestampWrapper);
+	}
+
+	@Override
+	public String toString() {
+		return currentReducedMap.toString();
+	}
+
+	@Override
+	protected void afterEmit() {
+		long lastTime = timestampWrapper.getTimestamp(lastStored);
+		if (lastTime - windowStartTime >= slideSize) {
+			windowStartTime = windowStartTime + slideSize;
+		}
+	}
+
+	@Override
+	public void evict(int n) {
+		toRemove += n;
+		Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
+
+		while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
+			toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
+			removeLastReduced();
+			lastPreAggregateSize = elementsPerPreAggregate.peek();
+		}
+
+		if (toRemove > 0 && lastPreAggregateSize == null) {
+			resetCurrent();
+			toRemove = 0;
+		}
+	}
+
+	@Override
+	protected boolean currentEligible(T next) {
+		return windowStartTime == timestampWrapper.getStartTime()
+				|| timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index bcfa188..d7338a0 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -44,7 +44,7 @@ public class WindowIntegrationTest implements Serializable {
 	private static final Integer MEMORYSIZE = 32;
 
 	@SuppressWarnings("serial")
-	private static class ModKey implements KeySelector<Integer, Integer> {
+	public static class ModKey implements KeySelector<Integer, Integer> {
 		private int m;
 
 		public ModKey(int m) {
@@ -126,82 +126,93 @@ public class WindowIntegrationTest implements Serializable {
 		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
 				.addSink(new DistributedSink4());
 
-		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
-				.addSink(new DistributedSink5());
+		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
+				.getDiscretizedStream().addSink(new DistributedSink5());
 
 		env.execute();
 
-		// sum ( Time of 3 slide 2 )
-		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
-		expected1.add(StreamWindow.fromElements(5));
-		expected1.add(StreamWindow.fromElements(11));
-		expected1.add(StreamWindow.fromElements(9));
-		expected1.add(StreamWindow.fromElements(10));
-		expected1.add(StreamWindow.fromElements(32));
-
-		validateOutput(expected1, CentralSink1.windows);
-
-		// Tumbling Time of 4 grouped by mod 2
-		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
-		expected2.add(StreamWindow.fromElements(2, 2, 4));
-		expected2.add(StreamWindow.fromElements(1, 3));
-		expected2.add(StreamWindow.fromElements(5));
-		expected2.add(StreamWindow.fromElements(10));
-		expected2.add(StreamWindow.fromElements(11, 11));
-
-		validateOutput(expected2, CentralSink2.windows);
-
-		// groupby mod 2 sum ( Tumbling Time of 4)
-		List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
-		expected3.add(StreamWindow.fromElements(4));
-		expected3.add(StreamWindow.fromElements(5));
-		expected3.add(StreamWindow.fromElements(22));
-		expected3.add(StreamWindow.fromElements(8));
-		expected3.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected3, DistributedSink1.windows);
-
-		// groupby mod3 Tumbling Count of 2 grouped by mod 2
-		List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
-		expected4.add(StreamWindow.fromElements(2, 2));
-		expected4.add(StreamWindow.fromElements(1));
-		expected4.add(StreamWindow.fromElements(4));
-		expected4.add(StreamWindow.fromElements(5, 11));
-		expected4.add(StreamWindow.fromElements(10));
-		expected4.add(StreamWindow.fromElements(11));
-		expected4.add(StreamWindow.fromElements(3));
-
-		validateOutput(expected4, DistributedSink2.windows);
-
-		// min ( Time of 2 slide 3 )
-		List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
-		expected5.add(StreamWindow.fromElements(1));
-		expected5.add(StreamWindow.fromElements(4));
-		expected5.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected5, CentralSink3.windows);
-
-		// groupby mod 2 max ( Tumbling Time of 4)
-		List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
-		expected6.add(StreamWindow.fromElements(3));
-		expected6.add(StreamWindow.fromElements(5));
-		expected6.add(StreamWindow.fromElements(11));
-		expected6.add(StreamWindow.fromElements(4));
-		expected6.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected6, DistributedSink3.windows);
-
-		List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
-		expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
-		expected7.add(StreamWindow.fromElements(10));
-		expected7.add(StreamWindow.fromElements(10, 11, 11));
-
-		validateOutput(expected7, DistributedSink4.windows);
+		 // sum ( Time of 3 slide 2 )
+		 List<StreamWindow<Integer>> expected1 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected1.add(StreamWindow.fromElements(5));
+		 expected1.add(StreamWindow.fromElements(11));
+		 expected1.add(StreamWindow.fromElements(9));
+		 expected1.add(StreamWindow.fromElements(10));
+		 expected1.add(StreamWindow.fromElements(32));
+		
+		 validateOutput(expected1, CentralSink1.windows);
+		
+		 // Tumbling Time of 4 grouped by mod 2
+		 List<StreamWindow<Integer>> expected2 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected2.add(StreamWindow.fromElements(2, 2, 4));
+		 expected2.add(StreamWindow.fromElements(1, 3));
+		 expected2.add(StreamWindow.fromElements(5));
+		 expected2.add(StreamWindow.fromElements(10));
+		 expected2.add(StreamWindow.fromElements(11, 11));
+		
+		 validateOutput(expected2, CentralSink2.windows);
+		
+		 // groupby mod 2 sum ( Tumbling Time of 4)
+		 List<StreamWindow<Integer>> expected3 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected3.add(StreamWindow.fromElements(4));
+		 expected3.add(StreamWindow.fromElements(5));
+		 expected3.add(StreamWindow.fromElements(22));
+		 expected3.add(StreamWindow.fromElements(8));
+		 expected3.add(StreamWindow.fromElements(10));
+		
+		 validateOutput(expected3, DistributedSink1.windows);
+		
+		 // groupby mod3 Tumbling Count of 2 grouped by mod 2
+		 List<StreamWindow<Integer>> expected4 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected4.add(StreamWindow.fromElements(2, 2));
+		 expected4.add(StreamWindow.fromElements(1));
+		 expected4.add(StreamWindow.fromElements(4));
+		 expected4.add(StreamWindow.fromElements(5, 11));
+		 expected4.add(StreamWindow.fromElements(10));
+		 expected4.add(StreamWindow.fromElements(11));
+		 expected4.add(StreamWindow.fromElements(3));
+		
+		 validateOutput(expected4, DistributedSink2.windows);
+		
+		 // min ( Time of 2 slide 3 )
+		 List<StreamWindow<Integer>> expected5 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected5.add(StreamWindow.fromElements(1));
+		 expected5.add(StreamWindow.fromElements(4));
+		 expected5.add(StreamWindow.fromElements(10));
+		
+		 validateOutput(expected5, CentralSink3.windows);
+		
+		 // groupby mod 2 max ( Tumbling Time of 4)
+		 List<StreamWindow<Integer>> expected6 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected6.add(StreamWindow.fromElements(3));
+		 expected6.add(StreamWindow.fromElements(5));
+		 expected6.add(StreamWindow.fromElements(11));
+		 expected6.add(StreamWindow.fromElements(4));
+		 expected6.add(StreamWindow.fromElements(10));
+		
+		 validateOutput(expected6, DistributedSink3.windows);
+		
+		 List<StreamWindow<Integer>> expected7 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
+		 expected7.add(StreamWindow.fromElements(10));
+		 expected7.add(StreamWindow.fromElements(10, 11, 11));
+		
+		 validateOutput(expected7, DistributedSink4.windows);
 
 		List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
-		expected8.add(StreamWindow.fromElements(12));
-		expected8.add(StreamWindow.fromElements(9));
-		expected8.add(StreamWindow.fromElements(32));
+		expected8.add(StreamWindow.fromElements(4, 8));
+		expected8.add(StreamWindow.fromElements(4, 5));
+		expected8.add(StreamWindow.fromElements(10, 22));
+
+		for (List<Integer> sw : DistributedSink5.windows) {
+			Collections.sort(sw);
+		}
 
 		validateOutput(expected8, DistributedSink5.windows);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
new file mode 100644
index 0000000..4e63f89
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducerTest.checkResults;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingCountGroupedPreReducerTest {
+
+	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+	ReduceFunction<Integer> reducer = new SumReducer();
+
+	KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+
+	@Test
+	public void testPreReduce1() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+				reducer, serializer, key, 3, 2, 0);
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1, 2));
+		expected.add(StreamWindow.fromElements(3, 6));
+		expected.add(StreamWindow.fromElements(5, 10));
+		expected.add(StreamWindow.fromElements(7, 14));
+		expected.add(StreamWindow.fromElements(9, 18));
+		expected.add(StreamWindow.fromElements(11, 22));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce2() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+				reducer, serializer, key, 5, 2, 0);
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.emitWindow(collector);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1, 2));
+		expected.add(StreamWindow.fromElements(4, 6));
+		expected.add(StreamWindow.fromElements(12, 8));
+		expected.add(StreamWindow.fromElements(18, 12));
+		expected.add(StreamWindow.fromElements(24, 16));
+		expected.add(StreamWindow.fromElements(30, 20));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce3() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+				reducer, serializer, key, 6, 3, 0);
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.store(3);
+		preReducer.emitWindow(collector);
+		preReducer.store(4);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.store(9);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(10);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(2, 4));
+		expected.add(StreamWindow.fromElements(9, 12));
+		expected.add(StreamWindow.fromElements(21, 18));
+		expected.add(StreamWindow.fromElements(30, 27));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce4() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+				reducer, serializer, key, 5, 1, 2);
+
+		preReducer.store(1);
+		preReducer.evict(1);
+		preReducer.store(1);
+		preReducer.evict(1);
+		preReducer.store(1);
+		preReducer.emitWindow(collector);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.emitWindow(collector);
+		preReducer.store(4);
+		preReducer.emitWindow(collector);
+		preReducer.store(5);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.store(7);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1));
+		expected.add(StreamWindow.fromElements(1, 2));
+		expected.add(StreamWindow.fromElements(4, 2));
+		expected.add(StreamWindow.fromElements(4, 6));
+		expected.add(StreamWindow.fromElements(9, 6));
+		expected.add(StreamWindow.fromElements(8, 12));
+		expected.add(StreamWindow.fromElements(15, 10));
+		expected.add(StreamWindow.fromElements(12, 18));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	private static class SumReducer implements ReduceFunction<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
new file mode 100644
index 0000000..83ad7ed
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingTimeGroupedPreReducerTest {
+
+	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+	ReduceFunction<Integer> reducer = new SumReducer();
+
+	KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+
+	@Test
+	public void testPreReduce1() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+				reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(
+						new Timestamp<Integer>() {
+
+							private static final long serialVersionUID = 1L;
+
+							@Override
+							public long getTimestamp(Integer value) {
+								return value;
+							}
+						}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1, 2));
+		expected.add(StreamWindow.fromElements(3, 6));
+		expected.add(StreamWindow.fromElements(5, 10));
+		expected.add(StreamWindow.fromElements(7, 14));
+		expected.add(StreamWindow.fromElements(9, 18));
+		expected.add(StreamWindow.fromElements(11, 22));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	protected static void checkResults(List<StreamWindow<Integer>> expected,
+			List<StreamWindow<Integer>> actual) {
+
+		for (StreamWindow<Integer> sw : expected) {
+			Collections.sort(sw);
+		}
+
+		for (StreamWindow<Integer> sw : actual) {
+			Collections.sort(sw);
+		}
+
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testPreReduce2() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+				reducer, serializer, key, 5, 2, new TimestampWrapper<Integer>(
+						new Timestamp<Integer>() {
+
+							private static final long serialVersionUID = 1L;
+
+							@Override
+							public long getTimestamp(Integer value) {
+								return value;
+							}
+						}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.emitWindow(collector);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1, 2));
+		expected.add(StreamWindow.fromElements(4, 6));
+		expected.add(StreamWindow.fromElements(12, 8));
+		expected.add(StreamWindow.fromElements(18, 12));
+		expected.add(StreamWindow.fromElements(24, 16));
+		expected.add(StreamWindow.fromElements(30, 20));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce3() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+				reducer, serializer, key, 6, 3, new TimestampWrapper<Integer>(
+						new Timestamp<Integer>() {
+
+							private static final long serialVersionUID = 1L;
+
+							@Override
+							public long getTimestamp(Integer value) {
+								return value;
+							}
+						}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.store(3);
+		preReducer.emitWindow(collector);
+		preReducer.store(4);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.store(9);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(10);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(2, 4));
+		expected.add(StreamWindow.fromElements(9, 12));
+		expected.add(StreamWindow.fromElements(21, 18));
+		expected.add(StreamWindow.fromElements(30, 27));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce4() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+				reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(
+						new Timestamp<Integer>() {
+
+							private static final long serialVersionUID = 1L;
+
+							@Override
+							public long getTimestamp(Integer value) {
+								return value;
+							}
+						}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.emitWindow(collector);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(14);
+		preReducer.emitWindow(collector);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.emitWindow(collector);
+		preReducer.store(21);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+
+		preReducer.store(9);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1, 2));
+		expected.add(StreamWindow.fromElements(3, 6));
+		expected.add(StreamWindow.fromElements(5, 10));
+		expected.add(StreamWindow.fromElements(7, 14));
+		expected.add(StreamWindow.fromElements(8));
+		expected.add(StreamWindow.fromElements(8));
+		expected.add(StreamWindow.fromElements(14));
+		expected.add(StreamWindow.fromElements(14));
+		expected.add(StreamWindow.fromElements(21));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	private static class SumReducer implements ReduceFunction<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+
+	}
+}