You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/01 10:32:54 UTC

[1/2] flink git commit: [FLINK-1662] [streaming] Triggering on fake element bugfix

Repository: flink
Updated Branches:
  refs/heads/master d8c74d209 -> c63580244


[FLINK-1662] [streaming] Triggering on fake element bugfix


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a3ae03b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a3ae03b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a3ae03b

Branch: refs/heads/master
Commit: 1a3ae03baff7cc3a406e2e63e7d57b3f7c71ea26
Parents: d8c74d2
Author: szape <ne...@gmail.com>
Authored: Wed Mar 25 17:19:17 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Apr 1 09:19:08 2015 +0200

----------------------------------------------------------------------
 .../api/invokable/operator/windowing/StreamDiscretizer.java        | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a3ae03b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
index 5403cc1..d4776e8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
@@ -92,6 +92,8 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>>
 	 */
 	protected synchronized void processRealElement(IN input) throws Exception {
 
+		// Setting the input element in order to avoid NullFieldException when triggering on fake element
+		windowEvent.setElement(input);
 		if (isActiveTrigger) {
 			ActiveTriggerPolicy<IN> trigger = (ActiveTriggerPolicy<IN>) triggerPolicy;
 			Object[] result = trigger.preNotifyTrigger(input);


[2/2] flink git commit: [FLINK-1797] Add jumping pre-reducer for Count and Time windows

Posted by gy...@apache.org.
[FLINK-1797] Add jumping pre-reducer for Count and Time windows

Closes #549


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6358024
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6358024
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6358024

Branch: refs/heads/master
Commit: c6358024454cd8225cf27a91db7f64ffa13189ee
Parents: 1a3ae03
Author: Gabor Gevay <gg...@gmail.com>
Authored: Tue Mar 31 05:36:30 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Apr 1 09:43:13 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/WindowedDataStream.java      |  31 ++++
 .../environment/StreamContextEnvironment.java   |   2 +-
 .../api/environment/StreamPlanEnvironment.java  |   2 +-
 .../streaming/api/windowing/WindowUtils.java    |  26 ++++
 .../JumpingCountGroupedPreReducer.java          |  53 +++++++
 .../windowbuffer/JumpingCountPreReducer.java    |  55 +++++++
 .../JumpingTimeGroupedPreReducer.java           |  55 +++++++
 .../windowbuffer/JumpingTimePreReducer.java     |  57 +++++++
 .../SlidingCountGroupedPreReducer.java          |   3 -
 .../windowbuffer/SlidingCountPreReducer.java    |   3 -
 .../windowbuffer/SlidingGroupedPreReducer.java  |   4 +
 .../windowbuffer/SlidingPreReducer.java         |   4 +
 .../SlidingTimeGroupedPreReducer.java           |   2 +-
 .../windowbuffer/SlidingTimePreReducer.java     |   3 +-
 .../windowbuffer/TumblingPreReducer.java        |   2 +-
 .../streaming/api/AggregationFunctionTest.java  |   3 +-
 .../JumpingCountGroupedPreReducerTest.java      | 156 +++++++++++++++++++
 .../JumpingCountPreReducerTest.java             | 107 +++++++++++++
 .../windowbuffer/JumpingTimePreReducerTest.java |  96 ++++++++++++
 19 files changed, 652 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 41d46bb..027f318 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -65,6 +65,10 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedP
 import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimePreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountGroupedPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimeGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 
@@ -525,6 +529,33 @@ public class WindowedDataStream<OUT> {
 							WindowUtils.getTimeStampWrapper(trigger));
 				}
 
+			} else if(WindowUtils.isJumpingCountPolicy(trigger, eviction)){
+				if(groupByKey == null){
+					return new JumpingCountPreReducer<OUT>((ReduceFunction<OUT>) transformation.getUDF(), getType()
+							.createSerializer(getExecutionConfig()),
+							WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
+				} else {
+					return new JumpingCountGroupedPreReducer<OUT>(
+							(ReduceFunction<OUT>) transformation.getUDF(),
+							groupByKey,
+							getType().createSerializer(getExecutionConfig()),
+							WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
+				}
+			} else if(WindowUtils.isJumpingTimePolicy(trigger, eviction)){
+				if(groupByKey == null) {
+					return new JumpingTimePreReducer<OUT>((ReduceFunction<OUT>) transformation.getUDF(),
+							getType().createSerializer(getExecutionConfig()),
+							WindowUtils.getSlideSize(trigger),
+							WindowUtils.getWindowSize(eviction),
+							WindowUtils.getTimeStampWrapper(trigger));
+				} else {
+					return new JumpingTimeGroupedPreReducer<OUT>((ReduceFunction<OUT>) transformation.getUDF(),
+							groupByKey,
+							getType().createSerializer(getExecutionConfig()),
+							WindowUtils.getSlideSize(trigger),
+							WindowUtils.getWindowSize(eviction),
+							WindowUtils.getTimeStampWrapper(trigger));
+				}
 			}
 		}
 		return new BasicWindowBuffer<OUT>();

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 84ad710..5ca6ddd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -45,7 +45,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 		} else {
 			// first check for old parallelism config key
 			setParallelism(GlobalConfiguration.getInteger(
-					ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
+					ConfigConstants.DEFAULT_PARALLELISM_KEY,
 					ConfigConstants.DEFAULT_PARALLELISM));
 			// then for new
 			setParallelism(GlobalConfiguration.getInteger(

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 02fccd0..9b6b933 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -39,7 +39,7 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 		} else {
 			// first check for old parallelism config key
 			setParallelism(GlobalConfiguration.getInteger(
-					ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
+					ConfigConstants.DEFAULT_PARALLELISM_KEY,
 					ConfigConstants.DEFAULT_PARALLELISM));
 			// then for new
 			setParallelism(GlobalConfiguration.getInteger(

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
index 034c7d7..944a478 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -162,4 +162,30 @@ public class WindowUtils {
 		}
 
 	}
+
+	public static boolean isJumpingCountPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+		if (isCountOnly(trigger, eviction)) {
+			long slide = getSlideSize(trigger);
+			long window = getWindowSize(eviction);
+
+			return slide > window
+					&& ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
+					.getStart()
+					&& ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
+		} else {
+			return false;
+		}
+	}
+
+	public static boolean isJumpingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+		if (isTimeOnly(trigger, eviction)) {
+			long slide = getSlideSize(trigger);
+			long window = getWindowSize(eviction);
+
+			return slide > window
+					&& getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
+		} else {
+			return false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java
new file mode 100644
index 0000000..1f7c83e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+public class JumpingCountGroupedPreReducer<T> extends TumblingGroupedPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final long countToSkip; // How many elements should be jumped over
+	private long skipped = 0; // How many elements have we skipped since the last emitWindow
+
+	public JumpingCountGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
+										TypeSerializer<T> serializer, long countToSkip) {
+		super(reducer, keySelector, serializer);
+		this.countToSkip = countToSkip;
+	}
+
+	@Override
+	public void emitWindow(Collector<StreamWindow<T>> collector) {
+		super.emitWindow(collector);
+		skipped = 0;
+	}
+
+	@Override
+	public void store(T element) throws Exception {
+		if(skipped == countToSkip){
+			super.store(element);
+		} else {
+			skipped++;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java
new file mode 100644
index 0000000..355d0ce
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+/**
+ * Non-grouped pre-reducer for jumping time eviction policy
+ * (the policies are based on count, and the slide size is larger than the window size).
+ */
+public class JumpingCountPreReducer<T> extends TumblingPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final long countToSkip; // How many elements should be jumped over
+	private long skipped = 0; // How many elements have we skipped since the last emitWindow
+
+	public JumpingCountPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer, long countToSkip){
+		super(reducer, serializer);
+		this.countToSkip = countToSkip;
+	}
+
+	@Override
+	public void emitWindow(Collector<StreamWindow<T>> collector) {
+		super.emitWindow(collector);
+		skipped = 0;
+	}
+
+	@Override
+	public void store(T element) throws Exception {
+		if(skipped == countToSkip){
+			super.store(element);
+		} else {
+			skipped++;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java
new file mode 100644
index 0000000..f2386a8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.util.Collector;
+
+public class JumpingTimeGroupedPreReducer<T> extends TumblingGroupedPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private TimestampWrapper<T> timestampWrapper;
+	protected long windowStartTime;
+	private long slideSize;
+
+	public JumpingTimeGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
+										TypeSerializer<T> serializer,
+										long slideSize, long windowSize, TimestampWrapper<T> timestampWrapper){
+		super(reducer, keySelector, serializer);
+		this.timestampWrapper = timestampWrapper;
+		this.windowStartTime = timestampWrapper.getStartTime() + slideSize - windowSize;
+		this.slideSize = slideSize;
+	}
+
+	@Override
+	public void emitWindow(Collector<StreamWindow<T>> collector) {
+		super.emitWindow(collector);
+		windowStartTime += slideSize;
+	}
+
+	public void store(T element) throws Exception {
+		if(timestampWrapper.getTimestamp(element) >= windowStartTime) {
+			super.store(element);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java
new file mode 100644
index 0000000..98c264d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.util.Collector;
+
+/**
+ * Non-grouped pre-reducer for jumping time eviction policy
+ * (the policies are based on time, and the slide size is larger than the window size).
+ */
+public class JumpingTimePreReducer<T> extends TumblingPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private TimestampWrapper<T> timestampWrapper;
+	protected long windowStartTime;
+	private long slideSize;
+
+	public JumpingTimePreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+								long slideSize, long windowSize, TimestampWrapper<T> timestampWrapper){
+		super(reducer, serializer);
+		this.timestampWrapper = timestampWrapper;
+		this.windowStartTime = timestampWrapper.getStartTime() + slideSize - windowSize;
+		this.slideSize = slideSize;
+	}
+
+	@Override
+	public void emitWindow(Collector<StreamWindow<T>> collector) {
+		super.emitWindow(collector);
+		windowStartTime += slideSize;
+	}
+
+	public void store(T element) throws Exception {
+		if(timestampWrapper.getTimestamp(element) >= windowStartTime) {
+			super.store(element);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
index 48bf1b6..8d690cc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
@@ -21,9 +21,6 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 
-/**
- * Non-grouped pre-reducer for tumbling eviction policy.
- */
 public class SlidingCountGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
index a1216f3..db14eb0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
@@ -20,9 +20,6 @@ package org.apache.flink.streaming.api.windowing.windowbuffer;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-/**
- * Non-grouped pre-reducer for tumbling eviction policy.
- */
 public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
index aa1d76c..0872c6e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
@@ -28,6 +28,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 
+/*
+ * Grouped pre-reducer for sliding eviction policy
+ * (the slide size is smaller than the window size).
+ */
 public abstract class SlidingGroupedPreReducer<T> extends SlidingPreReducer<T> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
index 8b9558f..0446b38 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -24,6 +24,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.util.Collector;
 
+/*
+ * Non-grouped pre-reducer for sliding eviction policy
+ * (the slide size is smaller than the window size).
+ */
 public abstract class SlidingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
index 1c293af..3724ce5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 /**
- * Non-grouped pre-reducer for tumbling eviction policy.
+ * Non-grouped pre-reducer for sliding time eviction policy.
  */
 public class SlidingTimeGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
index bf3ec98..2382b48 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
@@ -22,7 +22,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 /**
- * Non-grouped pre-reducer for tumbling eviction policy.
+ * Non-grouped pre-reducer for sliding time eviction policy
+ * (the policies are based on time, and the slide size is smaller than the window size).
  */
 public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
index e56e556..d08d207 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.util.Collector;
 
 /**
- * Non-grouped pre-reducer for tumbling eviction policy.
+ * Non-grouped pre-reducer for tumbling eviction policy (the slide size is the same as the window size).
  */
 public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index bc0023a..54a7692 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -430,7 +430,8 @@ public class AggregationFunctionTest {
 	}
 
 	public static class MyPojo implements Serializable {
-
+		
+		private static final long serialVersionUID = 1L;
 		public int f0;
 		public int f1;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
new file mode 100644
index 0000000..c91910b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.junit.Test;
+
+public class JumpingCountGroupedPreReducerTest {
+
+	TypeInformation<Tuple2<Integer, Integer>> type = TypeExtractor
+			.getForObject(new Tuple2<Integer, Integer>(1, 1));
+	TypeSerializer<Tuple2<Integer, Integer>> serializer = type.createSerializer(null);
+
+	KeySelector<Tuple2<Integer, Integer>, ?> key = KeySelectorUtil.getSelectorForKeys(
+			new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[] { 0 }, type), type, null);
+
+	Reducer reducer = new Reducer();
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testEmitWindow() throws Exception {
+
+		List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
+		inputs.add(new Tuple2<Integer, Integer>(1, 1));
+		inputs.add(new Tuple2<Integer, Integer>(0, 0));
+		inputs.add(new Tuple2<Integer, Integer>(1, -1));
+		inputs.add(new Tuple2<Integer, Integer>(1, -2));
+		inputs.add(new Tuple2<Integer, Integer>(100, -200));
+
+		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
+
+		WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(
+				reducer, key, serializer, 1);
+
+		wb.store(serializer.copy(inputs.get(4)));
+		wb.store(serializer.copy(inputs.get(0)));
+		wb.store(serializer.copy(inputs.get(1)));
+		wb.emitWindow(collector);
+
+		assertEquals(1, collected.size());
+
+		assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(1, 1),
+				new Tuple2<Integer, Integer>(0, 0)), collected.get(0));
+
+		wb.store(serializer.copy(inputs.get(4)));
+		wb.store(serializer.copy(inputs.get(0)));
+		wb.store(serializer.copy(inputs.get(1)));
+		wb.store(serializer.copy(inputs.get(2)));
+
+		// Nothing should happen here
+		wb.evict(3);
+
+		wb.store(serializer.copy(inputs.get(3)));
+
+		wb.emitWindow(collector);
+
+		assertEquals(2, collected.size());
+
+		assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, -2),
+				new Tuple2<Integer, Integer>(0, 0)), collected.get(1));
+
+		// Test whether function is mutating inputs or not
+		assertEquals(2, reducer.allInputs.size());
+		assertEquals(reducer.allInputs.get(0), inputs.get(2));
+		assertEquals(reducer.allInputs.get(1), inputs.get(3));
+
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testEmitWindow2() throws Exception {
+
+		List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
+		inputs.add(new Tuple2<Integer, Integer>(1, 1));
+		inputs.add(new Tuple2<Integer, Integer>(0, 0));
+		inputs.add(new Tuple2<Integer, Integer>(1, -1));
+		inputs.add(new Tuple2<Integer, Integer>(1, -2));
+		inputs.add(new Tuple2<Integer, Integer>(100, -200));
+
+		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
+
+		WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(
+				reducer, key, serializer, 1).sequentialID();
+
+		wb.store(serializer.copy(inputs.get(4)));
+		wb.store(serializer.copy(inputs.get(0)));
+		wb.store(serializer.copy(inputs.get(1)));
+		wb.emitWindow(collector);
+
+		assertSetEquals(StreamWindow.fromElements(inputs.get(0), inputs.get(1)), collected.get(0));
+
+		wb.store(serializer.copy(inputs.get(4)));
+		wb.store(serializer.copy(inputs.get(0)));
+		wb.store(serializer.copy(inputs.get(1)));
+		wb.store(serializer.copy(inputs.get(2)));
+		wb.emitWindow(collector);
+
+		assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(2, 0), inputs.get(1)), collected.get(1));
+
+
+	}
+
+	private static <T> void assertSetEquals(Collection<T> first, Collection<T> second) {
+		assertEquals(new HashSet<T>(first), new HashSet<T>(second));
+	}
+
+	@SuppressWarnings("serial")
+	private class Reducer implements ReduceFunction<Tuple2<Integer, Integer>> {
+
+		public List<Tuple2<Integer, Integer>> allInputs = new ArrayList<Tuple2<Integer, Integer>>();
+
+		@Override
+		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
+											   Tuple2<Integer, Integer> value2) throws Exception {
+			allInputs.add(value2);
+			value1.f0 = value1.f0 + value2.f0;
+			value1.f1 = value1.f1 + value2.f1;
+			return value1;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
new file mode 100644
index 0000000..ba890ab
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class JumpingCountPreReducerTest {
+
+	TypeSerializer<Tuple2<Integer, Integer>> serializer = TypeExtractor.getForObject(
+			new Tuple2<Integer, Integer>(1, 1)).createSerializer(null);
+
+	Reducer reducer = new Reducer();
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testEmitWindow() throws Exception {
+
+		List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
+		inputs.add(new Tuple2<Integer, Integer>(1, 1));
+		inputs.add(new Tuple2<Integer, Integer>(2, 0));
+		inputs.add(new Tuple2<Integer, Integer>(3, -1));
+		inputs.add(new Tuple2<Integer, Integer>(4, -2));
+		inputs.add(new Tuple2<Integer, Integer>(5, -3));
+
+		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
+
+		WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountPreReducer<Tuple2<Integer, Integer>>(
+				reducer, serializer, 2);
+
+		wb.store(serializer.copy(inputs.get(0)));
+		wb.store(serializer.copy(inputs.get(1)));
+		wb.store(serializer.copy(inputs.get(2)));
+		wb.store(serializer.copy(inputs.get(3)));
+		wb.store(serializer.copy(inputs.get(4)));
+
+		wb.emitWindow(collector);
+
+		assertEquals(1, collected.size());
+		assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(12, -6)),
+				collected.get(0));
+
+		wb.store(serializer.copy(inputs.get(0)));
+		wb.store(serializer.copy(inputs.get(1)));
+		wb.store(serializer.copy(inputs.get(2)));
+
+		// Nothing should happen here
+		wb.evict(3);
+
+		wb.store(serializer.copy(inputs.get(3)));
+
+		wb.emitWindow(collector);
+
+		assertEquals(2, collected.size());
+		assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(7, -3)),
+				collected.get(1));
+
+		// Test whether function is mutating inputs or not
+		assertEquals(3, reducer.allInputs.size());
+		assertEquals(reducer.allInputs.get(0), inputs.get(3));
+		assertEquals(reducer.allInputs.get(1), inputs.get(4));
+		assertEquals(reducer.allInputs.get(2), inputs.get(3));
+	}
+
+	@SuppressWarnings("serial")
+	private class Reducer implements ReduceFunction<Tuple2<Integer, Integer>> {
+
+		public List<Tuple2<Integer, Integer>> allInputs = new ArrayList<Tuple2<Integer, Integer>>();
+
+		@Override
+		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
+											   Tuple2<Integer, Integer> value2) throws Exception {
+			allInputs.add(value2);
+			value1.f0 = value1.f0 + value2.f0;
+			value1.f1 = value1.f1 + value2.f1;
+			return value1;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6358024/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
new file mode 100644
index 0000000..5b693e7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class JumpingTimePreReducerTest {
+
+	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+	ReduceFunction<Integer> reducer = new SumReducer();
+
+	@Test
+	public void testEmitWindow() throws Exception {
+
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		List<StreamWindow<Integer>> collected = collector.getCollected();
+
+		WindowBuffer<Integer> wb = new JumpingTimePreReducer<Integer>(
+				reducer, serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public long getTimestamp(Integer value) {
+				return value;
+			}
+		}, 1));
+
+		wb.store(1);
+		wb.store(2);
+		wb.store(3);
+		wb.evict(1);
+		wb.emitWindow(collector);
+
+		assertEquals(1, collected.size());
+		assertEquals(StreamWindow.fromElements(5),
+				collected.get(0));
+
+		wb.store(4);
+		wb.store(5);
+
+		// Nothing should happen here
+		wb.evict(2);
+
+		wb.store(6);
+
+		wb.emitWindow(collector);
+		wb.evict(2);
+		wb.emitWindow(collector);
+		wb.store(12);
+		wb.emitWindow(collector);
+
+		assertEquals(3, collected.size());
+		assertEquals(StreamWindow.fromElements(11),
+				collected.get(1));
+		assertEquals(StreamWindow.fromElements(12),
+				collected.get(2));
+	}
+
+	private static class SumReducer implements ReduceFunction<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+	}
+}