You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/11 01:23:12 UTC

[2/5] flink git commit: [FLINK-1619] [FLINK-1620] Basic sliding prereducers added for Time and Count

[FLINK-1619] [FLINK-1620] Basic sliding prereducers added for Time and Count


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70abc16d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70abc16d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70abc16d

Branch: refs/heads/master
Commit: 70abc16df78d47c62b7cc7f1545542b330562113
Parents: 2be00ac
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Mar 4 18:22:13 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Mar 10 16:33:43 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedDataStream.java      |  51 +++-
 .../windowing/policy/CountEvictionPolicy.java   |   8 +
 .../windowing/policy/CountTriggerPolicy.java    |   4 +
 .../windowing/policy/TimeEvictionPolicy.java    |   8 +
 .../api/windowing/policy/TimeTriggerPolicy.java |   4 +
 .../windowbuffer/BasicWindowBuffer.java         |   4 -
 .../windowbuffer/SlidingCountPreReducer.java    |  83 ++++++
 .../windowbuffer/SlidingPreReducer.java         | 141 ++++++++++
 .../windowbuffer/SlidingTimePreReducer.java     |  99 +++++++
 .../windowbuffer/TumblingGroupedPreReducer.java |   7 -
 .../windowbuffer/TumblingPreReducer.java        |   7 -
 .../windowing/windowbuffer/WindowBuffer.java    |   2 -
 .../windowing/WindowIntegrationTest.java        |  37 ++-
 .../windowbuffer/BasicWindowBufferTest.java     |   4 -
 .../SlidingCountPreReducerTest.java             | 216 ++++++++++++++++
 .../windowbuffer/SlidingTimePreReducerTest.java | 257 +++++++++++++++++++
 .../TumblingGroupedPreReducerTest.java          |   7 -
 .../windowbuffer/TumblingPreReducerTest.java    |   6 -
 18 files changed, 894 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 742ff22..4c7da88 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -46,15 +46,21 @@ import org.apache.flink.streaming.api.windowing.WindowEvent;
 import org.apache.flink.streaming.api.windowing.WindowUtils;
 import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
 import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.CompletePreAggregator;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
@@ -365,16 +371,41 @@ public class WindowedDataStream<OUT> {
 	@SuppressWarnings("unchecked")
 	private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
 
-		if (transformation == WindowTransformation.REDUCEWINDOW
-				&& getEviction() instanceof TumblingEvictionPolicy) {
-			if (groupByKey == null) {
-				return new TumblingPreReducer<OUT>(
-						clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
-								.createSerializer(getExecutionConfig()));
-			} else {
-				return new TumblingGroupedPreReducer<OUT>(
-						clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey, getType()
-								.createSerializer(getExecutionConfig()));
+		if (transformation == WindowTransformation.REDUCEWINDOW) {
+			if (getTrigger() instanceof TumblingEvictionPolicy) {
+				if (groupByKey == null) {
+					return new TumblingPreReducer<OUT>(
+							clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
+									.createSerializer(getExecutionConfig()));
+				} else {
+					return new TumblingGroupedPreReducer<OUT>(
+							clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey,
+							getType().createSerializer(getExecutionConfig()));
+				}
+			} else if (getTrigger() instanceof CountTriggerPolicy
+					&& getEviction() instanceof CountEvictionPolicy && groupByKey == null) {
+
+				int slide = ((CountTriggerPolicy<OUT>) getTrigger()).getSlideSize();
+				int window = ((CountEvictionPolicy<OUT>) getEviction()).getWindowSize();
+				int start = ((CountEvictionPolicy<OUT>) getEviction()).getStart();
+				if (slide < window) {
+					return new SlidingCountPreReducer<OUT>(
+							clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+									.getType().createSerializer(getExecutionConfig()), window,
+							slide, start);
+				}
+			} else if (getTrigger() instanceof TimeTriggerPolicy
+					&& getEviction() instanceof TimeEvictionPolicy && groupByKey == null) {
+				int slide = (int) ((TimeTriggerPolicy<OUT>) getTrigger()).getSlideSize();
+				int window = (int) ((TimeEvictionPolicy<OUT>) getEviction()).getWindowSize();
+				TimestampWrapper<OUT> wrapper = ((TimeEvictionPolicy<OUT>) getEviction())
+						.getTimeStampWrapper();
+				if (slide < window) {
+					return new SlidingTimePreReducer<OUT>(
+							clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+									.getType().createSerializer(getExecutionConfig()), window,
+							slide, wrapper);
+				}
 			}
 		}
 		return new BasicWindowBuffer<OUT>();

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
index 3ede27b..07478c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
@@ -136,6 +136,14 @@ public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
 		}
 	}
 
+	public int getWindowSize() {
+		return maxElements;
+	}
+
+	public int getStart() {
+		return startValue;
+	}
+
 	@Override
 	public String toString() {
 		return "CountPolicy(" + maxElements + ")";

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
index 6d8149a..d439f72 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
@@ -102,6 +102,10 @@ public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
 		}
 	}
 
+	public int getSlideSize() {
+		return max;
+	}
+
 	@Override
 	public String toString() {
 		return "CountPolicy(" + max + ")";

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index 982b6d5..ae17e29 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -150,10 +150,18 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 		}
 	}
 
+	public long getWindowSize() {
+		return granularity;
+	}
+
 	@Override
 	public String toString() {
 		return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
 				+ ")";
 	}
 
+	public TimestampWrapper<DATA> getTimeStampWrapper() {
+		return timestampWrapper;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index 7065582..ce8f16e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -213,6 +213,10 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 		}
 	}
 
+	public long getSlideSize() {
+		return granularity;
+	}
+
 	@Override
 	public String toString() {
 		return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
index 458de41..8e39398 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
@@ -63,10 +63,6 @@ public class BasicWindowBuffer<T> implements WindowBuffer<T> {
 		}
 	}
 
-	public int size() {
-		return buffer.size();
-	}
-
 	@Override
 	public BasicWindowBuffer<T> clone() {
 		return new BasicWindowBuffer<T>();

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
new file mode 100644
index 0000000..ff358cb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private int windowSize;
+	private int slideSize;
+	private int start;
+
+	public SlidingCountPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+			int windowSize, int slideSize, int start) {
+		super(reducer, serializer);
+		if (windowSize > slideSize) {
+			this.windowSize = windowSize;
+			this.slideSize = slideSize;
+			this.start = start;
+		} else {
+			throw new RuntimeException(
+					"Window size needs to be larger than slide size for the sliding pre-reducer");
+		}
+		index = index - start;
+	}
+
+	@Override
+	public SlidingCountPreReducer<T> clone() {
+		return new SlidingCountPreReducer<T>(reducer, serializer, windowSize, slideSize, start);
+	}
+
+	@Override
+	public void store(T element) throws Exception {
+		if (index >= 0) {
+			super.store(element);
+		} else {
+			index++;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return currentReduced.toString();
+	}
+
+	@Override
+	protected boolean addCurrentToReduce(T next) {
+		if (index <= slideSize) {
+			return true;
+		} else {
+			return index == windowSize;
+		}
+	}
+
+	@Override
+	protected void updateIndexAtEmit() {
+		if (index >= slideSize) {
+			index = index - slideSize;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
new file mode 100644
index 0000000..a21bb61
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator {
+
+	private static final long serialVersionUID = 1L;
+
+	protected ReduceFunction<T> reducer;
+
+	protected T currentReduced;
+	protected LinkedList<T> reduced = new LinkedList<T>();
+	protected LinkedList<Integer> elementsPerPreAggregate = new LinkedList<Integer>();
+
+	protected TypeSerializer<T> serializer;
+
+	protected int index = 0;
+	protected int toRemove = 0;
+
+	protected int elementsSinceLastPreAggregate = 0;
+
+	public SlidingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
+		this.reducer = reducer;
+		this.serializer = serializer;
+	}
+
+	public boolean emitWindow(Collector<StreamWindow<T>> collector) {
+		StreamWindow<T> currentWindow = new StreamWindow<T>();
+		T finalAggregate = getFinalAggregate();
+		if (finalAggregate != null) {
+			currentWindow.add(finalAggregate);
+			collector.collect(currentWindow);
+			updateIndexAtEmit();
+			return true;
+		} else {
+			updateIndexAtEmit();
+			return false;
+		}
+
+	}
+
+	protected abstract void updateIndexAtEmit();
+
+	public T getFinalAggregate() {
+		try {
+			if (!reduced.isEmpty()) {
+				T finalReduce = reduced.get(0);
+				for (int i = 1; i < reduced.size(); i++) {
+					finalReduce = reducer.reduce(finalReduce, serializer.copy(reduced.get(i)));
+
+				}
+				if (currentReduced != null) {
+					finalReduce = reducer.reduce(finalReduce, serializer.copy(currentReduced));
+				}
+				return finalReduce;
+			} else {
+				return currentReduced;
+			}
+
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	public void store(T element) throws Exception {
+		addToBufferIfEligible(element);
+		index++;
+	}
+
+	protected void addToBufferIfEligible(T element) throws Exception {
+		if (addCurrentToReduce(element) && currentReduced != null) {
+			reduced.add(currentReduced);
+			elementsPerPreAggregate.add(elementsSinceLastPreAggregate);
+			currentReduced = element;
+			elementsSinceLastPreAggregate = 1;
+		} else {
+			if (currentReduced == null) {
+				currentReduced = element;
+			} else {
+				currentReduced = reducer.reduce(serializer.copy(currentReduced), element);
+			}
+			elementsSinceLastPreAggregate++;
+		}
+	}
+
+	protected abstract boolean addCurrentToReduce(T next);
+
+	public void evict(int n) {
+		toRemove += n;
+
+		Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
+		while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
+			toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
+			reduced.removeFirst();
+			lastPreAggregateSize = elementsPerPreAggregate.peek();
+		}
+
+		if (lastPreAggregateSize == null) {
+			toRemove = 0;
+		}
+	}
+
+	public int max(int a, int b) {
+		if (a > b) {
+			return a;
+		} else {
+			return b;
+		}
+	}
+
+	@Override
+	public abstract SlidingPreReducer<T> clone();
+
+	@Override
+	public String toString() {
+		return currentReduced.toString();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
new file mode 100644
index 0000000..af2239c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private int windowSize;
+	private int slideSize;
+	private TimestampWrapper<T> timestampWrapper;
+	private T lastStored;
+	protected long windowStartTime;
+
+	public SlidingTimePreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+			int windowSize, int slideSize, TimestampWrapper<T> timestampWrapper) {
+		super(reducer, serializer);
+		if (windowSize > slideSize) {
+			this.windowSize = windowSize;
+			this.slideSize = slideSize;
+		} else {
+			throw new RuntimeException(
+					"Window size needs to be larger than slide size for the sliding pre-reducer");
+		}
+		this.timestampWrapper = timestampWrapper;
+		this.windowStartTime = timestampWrapper.getStartTime();
+	}
+
+	@Override
+	public void store(T element) throws Exception {
+		super.store(element);
+		lastStored = element;
+	}
+
+	@Override
+	public SlidingTimePreReducer<T> clone() {
+		return new SlidingTimePreReducer<T>(reducer, serializer, windowSize, slideSize,
+				timestampWrapper);
+	}
+
+	@Override
+	public String toString() {
+		return currentReduced.toString();
+	}
+
+	@Override
+	protected void updateIndexAtEmit() {
+		index = 0;
+		long lastTime = timestampWrapper.getTimestamp(lastStored);
+		if (lastTime - windowStartTime >= slideSize) {
+			windowStartTime = windowStartTime + slideSize;
+		}
+	}
+
+	@Override
+	public void evict(int n) {
+		toRemove += n;
+		Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
+
+		while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
+			toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
+			reduced.removeFirst();
+			lastPreAggregateSize = elementsPerPreAggregate.peek();
+		}
+
+		if (toRemove > 0 && lastPreAggregateSize == null) {
+			currentReduced = null;
+			toRemove = 0;
+		}
+	}
+
+	@Override
+	protected boolean addCurrentToReduce(T next) {
+		return windowStartTime == timestampWrapper.getStartTime()
+				|| timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
index 7403ffe..9431a99 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
@@ -38,7 +38,6 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
 
 	private Map<Object, T> reducedValues;
 
-	private int numOfElements = 0;
 	private TypeSerializer<T> serializer;
 
 	public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
@@ -56,7 +55,6 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
 			currentWindow.addAll(reducedValues.values());
 			collector.collect(currentWindow);
 			reducedValues.clear();
-			numOfElements = 0;
 			return true;
 		} else {
 			return false;
@@ -76,16 +74,11 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
 		}
 
 		reducedValues.put(key, reduced);
-		numOfElements++;
 	}
 
 	public void evict(int n) {
 	}
 
-	public int size() {
-		return numOfElements;
-	}
-
 	@Override
 	public TumblingGroupedPreReducer<T> clone() {
 		return new TumblingGroupedPreReducer<T>(reducer, keySelector, serializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
index 35cf60e..58b30a6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
@@ -32,7 +32,6 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
 	private ReduceFunction<T> reducer;
 
 	private T reduced;
-	private int numOfElements = 0;
 	private TypeSerializer<T> serializer;
 
 	public TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
@@ -46,7 +45,6 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
 			currentWindow.add(reduced);
 			collector.collect(currentWindow);
 			reduced = null;
-			numOfElements = 0;
 			return true;
 		} else {
 			return false;
@@ -59,16 +57,11 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
 		} else {
 			reduced = reducer.reduce(serializer.copy(reduced), element);
 		}
-		numOfElements++;
 	}
 
 	public void evict(int n) {
 	}
 
-	public int size() {
-		return numOfElements;
-	}
-
 	@Override
 	public TumblingPreReducer<T> clone() {
 		return new TumblingPreReducer<T>(reducer, serializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
index e0429ab..2dd50db 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
@@ -34,8 +34,6 @@ public interface WindowBuffer<T> extends Serializable, Cloneable {
 
 	public boolean emitWindow(Collector<StreamWindow<T>> collector);
 
-	public int size();
-
 	public WindowBuffer<T> clone();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 25b5f87..bcfa188 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -89,6 +89,8 @@ public class WindowIntegrationTest implements Serializable {
 		inputs.add(11);
 		inputs.add(11);
 
+		KeySelector<Integer, ?> key = new ModKey(2);
+
 		Timestamp<Integer> ts = new Timestamp<Integer>() {
 
 			private static final long serialVersionUID = 1L;
@@ -103,14 +105,12 @@ public class WindowIntegrationTest implements Serializable {
 
 		DataStream<Integer> source = env.fromCollection(inputs);
 
-		source.window(Time.of(2, ts)).every(Time.of(3, ts)).sum(0).getDiscretizedStream()
+		source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
 				.addSink(new CentralSink1());
 
 		source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
 				.flatten().addSink(new CentralSink2());
 
-		KeySelector<Integer, ?> key = new ModKey(2);
-
 		source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
 				.addSink(new DistributedSink1());
 
@@ -126,12 +126,17 @@ public class WindowIntegrationTest implements Serializable {
 		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
 				.addSink(new DistributedSink4());
 
+		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
+				.addSink(new DistributedSink5());
+
 		env.execute();
 
-		// sum ( Time of 2 slide 3 )
+		// sum ( Time of 3 slide 2 )
 		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
 		expected1.add(StreamWindow.fromElements(5));
+		expected1.add(StreamWindow.fromElements(11));
 		expected1.add(StreamWindow.fromElements(9));
+		expected1.add(StreamWindow.fromElements(10));
 		expected1.add(StreamWindow.fromElements(32));
 
 		validateOutput(expected1, CentralSink1.windows);
@@ -193,6 +198,13 @@ public class WindowIntegrationTest implements Serializable {
 
 		validateOutput(expected7, DistributedSink4.windows);
 
+		List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
+		expected8.add(StreamWindow.fromElements(12));
+		expected8.add(StreamWindow.fromElements(9));
+		expected8.add(StreamWindow.fromElements(32));
+
+		validateOutput(expected8, DistributedSink5.windows);
+
 	}
 
 	public static <R> void validateOutput(List<R> expected, List<R> actual) {
@@ -317,4 +329,21 @@ public class WindowIntegrationTest implements Serializable {
 		}
 
 	}
+
+	@SuppressWarnings("serial")
+	private static class DistributedSink5 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+		@Override
+		public void cancel() {
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
index bb756c7..967c719 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
@@ -41,8 +41,6 @@ public class BasicWindowBufferTest {
 		wb.store(2);
 		wb.store(10);
 
-		assertEquals(2, wb.size());
-
 		wb.emitWindow(collector);
 
 		assertEquals(1, collected.size());
@@ -51,8 +49,6 @@ public class BasicWindowBufferTest {
 		wb.store(4);
 		wb.evict(2);
 
-		assertEquals(1, wb.size());
-
 		wb.emitWindow(collector);
 
 		assertEquals(2, collected.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
new file mode 100644
index 0000000..3ce65f1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingCountPreReducerTest {
+
+	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+	ReduceFunction<Integer> reducer = new SumReducer();
+
+	@Test
+	public void testPreReduce1() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+				serializer, 3, 2, 0);
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(3));
+		expected.add(StreamWindow.fromElements(9));
+		expected.add(StreamWindow.fromElements(15));
+		expected.add(StreamWindow.fromElements(21));
+		expected.add(StreamWindow.fromElements(27));
+		expected.add(StreamWindow.fromElements(33));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce2() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+				serializer, 5, 2, 0);
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.emitWindow(collector);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(3));
+		expected.add(StreamWindow.fromElements(10));
+		expected.add(StreamWindow.fromElements(20));
+		expected.add(StreamWindow.fromElements(30));
+		expected.add(StreamWindow.fromElements(40));
+		expected.add(StreamWindow.fromElements(50));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce3() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+				serializer, 6, 3, 0);
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.store(3);
+		preReducer.emitWindow(collector);
+		preReducer.store(4);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.store(9);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(10);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(6));
+		expected.add(StreamWindow.fromElements(21));
+		expected.add(StreamWindow.fromElements(39));
+		expected.add(StreamWindow.fromElements(57));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce4() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+				serializer, 5, 1, 2);
+
+		preReducer.store(1);
+		preReducer.evict(1);
+		preReducer.store(1);
+		preReducer.evict(1);
+		preReducer.store(1);
+		preReducer.emitWindow(collector);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.emitWindow(collector);
+		preReducer.store(4);
+		preReducer.emitWindow(collector);
+		preReducer.store(5);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.store(7);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1));
+		expected.add(StreamWindow.fromElements(3));
+		expected.add(StreamWindow.fromElements(6));
+		expected.add(StreamWindow.fromElements(10));
+		expected.add(StreamWindow.fromElements(15));
+		expected.add(StreamWindow.fromElements(20));
+		expected.add(StreamWindow.fromElements(25));
+		expected.add(StreamWindow.fromElements(30));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	private static class SumReducer implements ReduceFunction<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
new file mode 100644
index 0000000..bc3b13b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingTimePreReducerTest {
+
+	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+	ReduceFunction<Integer> reducer = new SumReducer();
+
+	@Test
+	public void testPreReduce1() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+				serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public long getTimestamp(Integer value) {
+						return value;
+					}
+				}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(3));
+		expected.add(StreamWindow.fromElements(9));
+		expected.add(StreamWindow.fromElements(15));
+		expected.add(StreamWindow.fromElements(21));
+		expected.add(StreamWindow.fromElements(27));
+		expected.add(StreamWindow.fromElements(33));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce2() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+				serializer, 5, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public long getTimestamp(Integer value) {
+						return value;
+					}
+				}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.emitWindow(collector);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(3));
+		expected.add(StreamWindow.fromElements(10));
+		expected.add(StreamWindow.fromElements(20));
+		expected.add(StreamWindow.fromElements(30));
+		expected.add(StreamWindow.fromElements(40));
+		expected.add(StreamWindow.fromElements(50));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce3() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+				serializer, 6, 3, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public long getTimestamp(Integer value) {
+						return value;
+					}
+				}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.store(3);
+		preReducer.emitWindow(collector);
+		preReducer.store(4);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.store(9);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(10);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(6));
+		expected.add(StreamWindow.fromElements(21));
+		expected.add(StreamWindow.fromElements(39));
+		expected.add(StreamWindow.fromElements(57));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce4() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+				serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public long getTimestamp(Integer value) {
+						return value;
+					}
+				}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.emitWindow(collector);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(14);
+		preReducer.emitWindow(collector);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.emitWindow(collector);
+		preReducer.store(21);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+
+		preReducer.store(9);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(3));
+		expected.add(StreamWindow.fromElements(9));
+		expected.add(StreamWindow.fromElements(15));
+		expected.add(StreamWindow.fromElements(21));
+		expected.add(StreamWindow.fromElements(8));
+		expected.add(StreamWindow.fromElements(8));
+		expected.add(StreamWindow.fromElements(14));
+		expected.add(StreamWindow.fromElements(14));
+		expected.add(StreamWindow.fromElements(21));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	private static class SumReducer implements ReduceFunction<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
index 437fcd6..95aace0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
@@ -67,9 +67,6 @@ public class TumblingGroupedPreReducerTest {
 
 		wb.store(serializer.copy(inputs.get(0)));
 		wb.store(serializer.copy(inputs.get(1)));
-
-		assertEquals(2, wb.size());
-
 		wb.emitWindow(collector);
 
 		assertEquals(1, collected.size());
@@ -77,9 +74,6 @@ public class TumblingGroupedPreReducerTest {
 		assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(1, 1),
 				new Tuple2<Integer, Integer>(0, 0)), collected.get(0));
 
-		// Test automatic eviction
-		assertEquals(0, wb.size());
-
 		wb.store(serializer.copy(inputs.get(0)));
 		wb.store(serializer.copy(inputs.get(1)));
 		wb.store(serializer.copy(inputs.get(2)));
@@ -89,7 +83,6 @@ public class TumblingGroupedPreReducerTest {
 
 		wb.store(serializer.copy(inputs.get(3)));
 
-		assertEquals(4, wb.size());
 		wb.emitWindow(collector);
 
 		assertEquals(2, collected.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
index 31be227..ddaf747 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
@@ -58,17 +58,12 @@ public class TumblingPreReducerTest {
 		wb.store(serializer.copy(inputs.get(0)));
 		wb.store(serializer.copy(inputs.get(1)));
 
-		assertEquals(2, wb.size());
-
 		wb.emitWindow(collector);
 
 		assertEquals(1, collected.size());
 		assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, 1)),
 				collected.get(0));
 
-		// Test automatic eviction
-		assertEquals(0, wb.size());
-
 		wb.store(serializer.copy(inputs.get(0)));
 		wb.store(serializer.copy(inputs.get(1)));
 		wb.store(serializer.copy(inputs.get(2)));
@@ -78,7 +73,6 @@ public class TumblingPreReducerTest {
 
 		wb.store(serializer.copy(inputs.get(3)));
 
-		assertEquals(4, wb.size());
 		wb.emitWindow(collector);
 
 		assertEquals(2, collected.size());