You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/11 01:23:11 UTC

[1/5] flink git commit: [FLINK-1643] [streaming] Auto detection of tumbling policies added + WindowedDataStream refactor

Repository: flink
Updated Branches:
  refs/heads/master 2bba2b3f0 -> 2522f028b


[FLINK-1643] [streaming] Auto detection of tumbling policies added + WindowedDataStream refactor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aacd4f29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aacd4f29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aacd4f29

Branch: refs/heads/master
Commit: aacd4f2916e6d7cc79b0891609d6485b1ec72ef2
Parents: 70abc16
Author: Gyula Fora <gy...@apache.org>
Authored: Sun Mar 8 14:29:30 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Mar 10 16:33:43 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedDataStream.java      | 57 +++++-------
 .../windowing/ParallelWindowPartitioner.java    | 84 +++++++++++++++++
 .../windowing/WindowBufferInvokable.java        |  3 -
 .../streaming/api/windowing/StreamWindow.java   |  4 +
 .../streaming/api/windowing/WindowUtils.java    | 93 +++++++++++++++++++
 .../streaming/api/windowing/helper/Time.java    | 17 +---
 .../windowing/policy/CountEvictionPolicy.java   |  4 +
 .../windowing/policy/CountTriggerPolicy.java    |  4 +
 .../api/windowing/policy/TimeTriggerPolicy.java | 39 ++------
 .../windowbuffer/SlidingCountPreReducer.java    | 25 ++---
 .../windowbuffer/SlidingPreReducer.java         | 96 ++++++++++++--------
 .../windowbuffer/SlidingTimePreReducer.java     | 11 +--
 .../windowing/StreamDiscretizerTest.java        |  2 +-
 13 files changed, 300 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 4c7da88..6ce9f9f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -46,14 +46,11 @@ import org.apache.flink.streaming.api.windowing.WindowEvent;
 import org.apache.flink.streaming.api.windowing.WindowUtils;
 import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
 import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
@@ -261,7 +258,7 @@ public class WindowedDataStream<OUT> {
 	public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
 
 		WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
-				.with(reduceFunction);
+				.with(clean(reduceFunction));
 
 		WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation);
 
@@ -288,7 +285,7 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream
 	 */
 	public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
-		return discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction),
+		return discretize(WindowTransformation.MAPWINDOW.with(clean(windowMapFunction)),
 				new BasicWindowBuffer<OUT>()).mapWindow(windowMapFunction);
 	}
 
@@ -370,42 +367,36 @@ public class WindowedDataStream<OUT> {
 
 	@SuppressWarnings("unchecked")
 	private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
+		TriggerPolicy<OUT> trigger = getTrigger();
+		EvictionPolicy<OUT> eviction = getEviction();
 
 		if (transformation == WindowTransformation.REDUCEWINDOW) {
-			if (getTrigger() instanceof TumblingEvictionPolicy) {
+			if (WindowUtils.isTumblingPolicy(trigger, eviction)) {
 				if (groupByKey == null) {
 					return new TumblingPreReducer<OUT>(
-							clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
+							(ReduceFunction<OUT>) transformation.getUDF(), getType()
 									.createSerializer(getExecutionConfig()));
 				} else {
 					return new TumblingGroupedPreReducer<OUT>(
-							clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey,
-							getType().createSerializer(getExecutionConfig()));
-				}
-			} else if (getTrigger() instanceof CountTriggerPolicy
-					&& getEviction() instanceof CountEvictionPolicy && groupByKey == null) {
-
-				int slide = ((CountTriggerPolicy<OUT>) getTrigger()).getSlideSize();
-				int window = ((CountEvictionPolicy<OUT>) getEviction()).getWindowSize();
-				int start = ((CountEvictionPolicy<OUT>) getEviction()).getStart();
-				if (slide < window) {
-					return new SlidingCountPreReducer<OUT>(
-							clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
-									.getType().createSerializer(getExecutionConfig()), window,
-							slide, start);
-				}
-			} else if (getTrigger() instanceof TimeTriggerPolicy
-					&& getEviction() instanceof TimeEvictionPolicy && groupByKey == null) {
-				int slide = (int) ((TimeTriggerPolicy<OUT>) getTrigger()).getSlideSize();
-				int window = (int) ((TimeEvictionPolicy<OUT>) getEviction()).getWindowSize();
-				TimestampWrapper<OUT> wrapper = ((TimeEvictionPolicy<OUT>) getEviction())
-						.getTimeStampWrapper();
-				if (slide < window) {
-					return new SlidingTimePreReducer<OUT>(
-							clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
-									.getType().createSerializer(getExecutionConfig()), window,
-							slide, wrapper);
+							(ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
+									.createSerializer(getExecutionConfig()));
 				}
+			} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction) && groupByKey == null) {
+
+				return new SlidingCountPreReducer<OUT>(
+						clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream.getType()
+								.createSerializer(getExecutionConfig()),
+						WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+						((CountTriggerPolicy<?>) trigger).getStart());
+
+			} else if (WindowUtils.isSlidingTimePolicy(trigger, eviction) && groupByKey == null) {
+
+				return new SlidingTimePreReducer<OUT>(
+						(ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+								.createSerializer(getExecutionConfig()),
+						WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+						WindowUtils.getTimeStampWrapper(trigger));
+
 			}
 		}
 		return new BasicWindowBuffer<OUT>();

http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java
new file mode 100644
index 0000000..32778da
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+/**
+ * This invokable applies either split or key partitioning depending on the
+ * transformation.
+ */
+public class ParallelWindowPartitioner<T> extends
+		ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {
+
+	private KeySelector<T, ?> keySelector;
+	private int numberOfSplits;
+	private int currentWindowID = 0;
+
+	public ParallelWindowPartitioner(KeySelector<T, ?> keySelector) {
+		super(null);
+		this.keySelector = keySelector;
+	}
+
+	public ParallelWindowPartitioner(int numberOfSplits) {
+		super(null);
+		this.numberOfSplits = numberOfSplits;
+	}
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void invoke() throws Exception {
+		while (isRunning && readNext() != null) {
+			callUserFunctionAndLogException();
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		StreamWindow<T> currentWindow = nextObject;
+		currentWindow.setID(++currentWindowID);
+
+		if (keySelector == null) {
+			if (numberOfSplits <= 1) {
+				collector.collect(currentWindow);
+			} else {
+				for (StreamWindow<T> window : currentWindow.split(numberOfSplits)) {
+					collector.collect(window);
+				}
+			}
+		} else {
+
+			for (StreamWindow<T> window : currentWindow.partitionBy(keySelector)) {
+				collector.collect(window);
+			}
+
+		}
+	}
+
+	@Override
+	public void collect(StreamWindow<T> record) {
+		if (isRunning) {
+			nextObject = record;
+			callUserFunctionAndLogException();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
index 72be823..75f7d9d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
@@ -52,13 +52,10 @@ public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>,
 	protected void handleWindowEvent(WindowEvent<T> windowEvent, WindowBuffer<T> buffer)
 			throws Exception {
 		if (windowEvent.isElement()) {
-			System.out.println("element: " + windowEvent.getElement());
 			buffer.store(windowEvent.getElement());
 		} else if (windowEvent.isEviction()) {
-			System.out.println("eviction: " + windowEvent.getEviction());
 			buffer.evict(windowEvent.getEviction());
 		} else if (windowEvent.isTrigger()) {
-			System.out.println("trigger");
 			buffer.emitWindow(collector);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
index 49b40f5..b45babb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
@@ -172,6 +172,10 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 		return this;
 	}
 
+	public void setID(int id) {
+		this.windowID = id;
+	}
+
 	/**
 	 * Checks whether this window can be merged with the given one.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
index 1e3ba86..246aff2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.windowing;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
@@ -48,10 +49,102 @@ public class WindowUtils {
 				|| (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy);
 	}
 
+	public static boolean isSlidingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+		if (isTimeOnly(trigger, eviction)) {
+			long slide = getSlideSize(trigger);
+			long window = getWindowSize(eviction);
+
+			return slide < window
+					&& getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
+		} else {
+			return false;
+		}
+	}
+
+	public static boolean isSlidingCountPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+		if (isCountOnly(trigger, eviction)) {
+			long slide = getSlideSize(trigger);
+			long window = getWindowSize(eviction);
+
+			return slide < window
+					&& ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
+							.getStart()
+					&& ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
+		} else {
+			return false;
+		}
+	}
+
+	public static <X> TimestampWrapper<X> getTimeStampWrapper(TriggerPolicy<X> trigger) {
+		if (trigger instanceof TimeTriggerPolicy) {
+			return ((TimeTriggerPolicy<X>) trigger).getTimeStampWrapper();
+		} else {
+			throw new IllegalArgumentException(
+					"Timestamp wrapper can only be accessed for time policies");
+		}
+	}
+
+	public static <X> TimestampWrapper<X> getTimeStampWrapper(EvictionPolicy<X> eviction) {
+		if (eviction instanceof EvictionPolicy) {
+			return ((TimeEvictionPolicy<X>) eviction).getTimeStampWrapper();
+		} else {
+			throw new IllegalArgumentException(
+					"Timestamp wrapper can only be accessed for time policies");
+		}
+	}
+
+	public static long getSlideSize(TriggerPolicy<?> trigger) {
+		if (trigger instanceof TimeTriggerPolicy) {
+			return ((TimeTriggerPolicy<?>) trigger).getSlideSize();
+		} else if (trigger instanceof CountTriggerPolicy) {
+			return ((CountTriggerPolicy<?>) trigger).getSlideSize();
+		} else {
+			throw new IllegalArgumentException(
+					"Slide size can only be accessed for time or count policies");
+		}
+	}
+
+	public static long getWindowSize(EvictionPolicy<?> eviction) {
+		if (eviction instanceof TimeEvictionPolicy) {
+			return ((TimeEvictionPolicy<?>) eviction).getWindowSize();
+		} else if (eviction instanceof CountEvictionPolicy) {
+			return ((CountEvictionPolicy<?>) eviction).getWindowSize();
+		} else {
+			throw new IllegalArgumentException(
+					"Window size can only be accessed for time or count policies");
+		}
+	}
+
+	public static boolean isTumblingPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+		if (eviction instanceof TumblingEvictionPolicy) {
+			return true;
+		} else if (isTimeOnly(trigger, eviction)) {
+			long slide = getSlideSize(trigger);
+			long window = getWindowSize(eviction);
+
+			return slide == window
+					&& getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
+		} else if (isCountOnly(trigger, eviction)) {
+			long slide = getSlideSize(trigger);
+			long window = getWindowSize(eviction);
+
+			return slide == window
+					&& ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
+							.getStart()
+					&& ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
+		} else {
+			return false;
+		}
+	}
+
 	public static boolean isTimeOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
 		return trigger instanceof TimeTriggerPolicy && eviction instanceof TimeEvictionPolicy;
 	}
 
+	public static boolean isCountOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+		return trigger instanceof CountTriggerPolicy && eviction instanceof CountEvictionPolicy;
+	}
+
 	public static boolean isSystemTimeTrigger(TriggerPolicy<?> trigger) {
 		return trigger instanceof TimeTriggerPolicy
 				&& ((TimeTriggerPolicy<?>) trigger).timestampWrapper.isDefaultTimestamp();

http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index f94eea4..da8d929 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -80,7 +80,6 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 		this.length = length;
 		this.granularity = timeUnit;
 		this.timestampWrapper = timestampWrapper;
-		this.delay = 0;
 	}
 
 	@Override
@@ -90,7 +89,7 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 
 	@Override
 	public TriggerPolicy<DATA> toTrigger() {
-		return new TimeTriggerPolicy<DATA>(granularityInMillis(), timestampWrapper, delay);
+		return new TimeTriggerPolicy<DATA>(granularityInMillis(), timestampWrapper);
 	}
 
 	/**
@@ -147,19 +146,7 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 	public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp) {
 		return of(length, timestamp, 0);
 	}
-
-	/**
-	 * Sets the delay for the first processed window.
-	 * 
-	 * @param delay
-	 *            The number of time units before the first processed window.
-	 * @return Helper representing the time based trigger and eviction policy
-	 */
-	public Time<DATA> withDelay(long delay) {
-		this.delay = delay;
-		return this;
-	}
-
+	
 	protected long granularityInMillis() {
 		return granularity == null ? length : granularity.toMillis(length);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
index 07478c6..5e0c862 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
@@ -143,6 +143,10 @@ public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
 	public int getStart() {
 		return startValue;
 	}
+	
+	public int getDeleteOnEviction(){
+		return deleteOnEviction;
+	}
 
 	@Override
 	public String toString() {

http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
index d439f72..9bd6f82 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
@@ -105,6 +105,10 @@ public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
 	public int getSlideSize() {
 		return max;
 	}
+	
+	public int getStart() {
+		return startValue;
+	}
 
 	@Override
 	public String toString() {

http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index ce8f16e..4c8a942 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -43,30 +43,6 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	protected long startTime;
 	public long granularity;
 	public TimestampWrapper<DATA> timestampWrapper;
-	protected long delay;
-
-	/**
-	 * This trigger policy triggers with regard to the time. The is measured
-	 * using a given {@link Timestamp} implementation. A point in time is always
-	 * represented as long. Therefore, parameters such as granularity can be set
-	 * as long value as well. If this value for the granularity is set to 2 for
-	 * example, the policy will trigger at every second point in time.
-	 * 
-	 * @param granularity
-	 *            The granularity of the trigger. If this value is set to x the
-	 *            policy will trigger at every x-th time point
-	 * @param timestampWrapper
-	 *            The {@link TimestampWrapper} to measure the time with. This
-	 *            can be either user defined of provided by the API.
-	 * @param timeWrapper
-	 *            This policy creates fake elements to not miss windows in case
-	 *            no element arrived within the duration of the window. This
-	 *            extractor should wrap a long into such an element of type
-	 *            DATA.
-	 */
-	public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
-		this(granularity, timestampWrapper, 0);
-	}
 
 	/**
 	 * This is mostly the same as
@@ -81,21 +57,16 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	 * @param timestampWrapper
 	 *            The {@link TimestampWrapper} to measure the time with. This
 	 *            can be either user defined of provided by the API.
-	 * @param delay
-	 *            A delay for the first trigger. If the start time given by the
-	 *            timestamp is x, the delay is y, and the granularity is z, the
-	 *            first trigger will happen at x+y+z.
 	 * @param timeWrapper
 	 *            This policy creates fake elements to not miss windows in case
 	 *            no element arrived within the duration of the window. This
 	 *            extractor should wrap a long into such an element of type
 	 *            DATA.
 	 */
-	public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper, long delay) {
-		this.startTime = timestampWrapper.getStartTime() + delay;
+	public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
+		this.startTime = timestampWrapper.getStartTime();
 		this.timestampWrapper = timestampWrapper;
 		this.granularity = granularity;
-		this.delay = delay;
 	}
 
 	/**
@@ -194,7 +165,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 
 	@Override
 	public TimeTriggerPolicy<DATA> clone() {
-		return new TimeTriggerPolicy<DATA>(granularity, timestampWrapper, delay);
+		return new TimeTriggerPolicy<DATA>(granularity, timestampWrapper);
 	}
 
 	@Override
@@ -222,5 +193,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 		return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
 				+ ")";
 	}
+	
+	public TimestampWrapper<DATA> getTimeStampWrapper() {
+		return timestampWrapper;
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
index ff358cb..a1216f3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
@@ -27,12 +27,14 @@ public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	private int windowSize;
-	private int slideSize;
+	private long windowSize;
+	private long slideSize;
 	private int start;
 
+	protected long index = 0;
+
 	public SlidingCountPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
-			int windowSize, int slideSize, int start) {
+			long windowSize, long slideSize, int start) {
 		super(reducer, serializer);
 		if (windowSize > slideSize) {
 			this.windowSize = windowSize;
@@ -46,8 +48,8 @@ public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
 	}
 
 	@Override
-	public SlidingCountPreReducer<T> clone() {
-		return new SlidingCountPreReducer<T>(reducer, serializer, windowSize, slideSize, start);
+	protected void afterStore() {
+		index++;
 	}
 
 	@Override
@@ -60,12 +62,7 @@ public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
 	}
 
 	@Override
-	public String toString() {
-		return currentReduced.toString();
-	}
-
-	@Override
-	protected boolean addCurrentToReduce(T next) {
+	protected boolean currentEligible(T next) {
 		if (index <= slideSize) {
 			return true;
 		} else {
@@ -74,10 +71,14 @@ public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
 	}
 
 	@Override
-	protected void updateIndexAtEmit() {
+	protected void afterEmit() {
 		if (index >= slideSize) {
 			index = index - slideSize;
 		}
 	}
 
+	@Override
+	public SlidingCountPreReducer<T> clone() {
+		return new SlidingCountPreReducer<T>(reducer, serializer, windowSize, slideSize, start);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
index a21bb61..1dd126d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -36,7 +36,6 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
 
 	protected TypeSerializer<T> serializer;
 
-	protected int index = 0;
 	protected int toRemove = 0;
 
 	protected int elementsSinceLastPreAggregate = 0;
@@ -48,64 +47,87 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
 
 	public boolean emitWindow(Collector<StreamWindow<T>> collector) {
 		StreamWindow<T> currentWindow = new StreamWindow<T>();
-		T finalAggregate = getFinalAggregate();
-		if (finalAggregate != null) {
-			currentWindow.add(finalAggregate);
-			collector.collect(currentWindow);
-			updateIndexAtEmit();
-			return true;
-		} else {
-			updateIndexAtEmit();
-			return false;
+
+		try {
+			if (addFinalAggregate(currentWindow)) {
+				collector.collect(currentWindow);
+				afterEmit();
+				return true;
+			} else {
+				afterEmit();
+				return false;
+			}
+		} catch (Exception e) {
+			throw new RuntimeException(e);
 		}
 
 	}
 
-	protected abstract void updateIndexAtEmit();
+	protected void afterEmit() {
+		// Do nothing by default
+	}
 
-	public T getFinalAggregate() {
-		try {
-			if (!reduced.isEmpty()) {
-				T finalReduce = reduced.get(0);
-				for (int i = 1; i < reduced.size(); i++) {
-					finalReduce = reducer.reduce(finalReduce, serializer.copy(reduced.get(i)));
-
-				}
-				if (currentReduced != null) {
-					finalReduce = reducer.reduce(finalReduce, serializer.copy(currentReduced));
-				}
-				return finalReduce;
-			} else {
-				return currentReduced;
+	public boolean addFinalAggregate(StreamWindow<T> currentWindow) throws Exception {
+		T finalReduce = null;
+
+		if (!reduced.isEmpty()) {
+			finalReduce = reduced.get(0);
+			for (int i = 1; i < reduced.size(); i++) {
+				finalReduce = reducer.reduce(finalReduce, serializer.copy(reduced.get(i)));
+
+			}
+			if (currentReduced != null) {
+				finalReduce = reducer.reduce(finalReduce, serializer.copy(currentReduced));
 			}
 
-		} catch (Exception e) {
-			throw new RuntimeException(e);
+		} else {
+			finalReduce = currentReduced;
 		}
+
+		if (finalReduce != null) {
+			currentWindow.add(finalReduce);
+			return true;
+		} else {
+			return false;
+		}
+
 	}
 
 	public void store(T element) throws Exception {
 		addToBufferIfEligible(element);
-		index++;
+		afterStore();
+	}
+
+	protected void afterStore() {
+		// Do nothing by default
 	}
 
 	protected void addToBufferIfEligible(T element) throws Exception {
-		if (addCurrentToReduce(element) && currentReduced != null) {
-			reduced.add(currentReduced);
+		if (currentEligible(element) && currentReduced != null) {
+			addCurrentToBuffer(element);
 			elementsPerPreAggregate.add(elementsSinceLastPreAggregate);
-			currentReduced = element;
 			elementsSinceLastPreAggregate = 1;
 		} else {
-			if (currentReduced == null) {
-				currentReduced = element;
-			} else {
-				currentReduced = reducer.reduce(serializer.copy(currentReduced), element);
-			}
+			updateCurrent(element);
+
 			elementsSinceLastPreAggregate++;
 		}
 	}
 
-	protected abstract boolean addCurrentToReduce(T next);
+	protected void updateCurrent(T element) throws Exception {
+		if (currentReduced == null) {
+			currentReduced = element;
+		} else {
+			currentReduced = reducer.reduce(serializer.copy(currentReduced), element);
+		}
+	}
+
+	protected void addCurrentToBuffer(T element) {
+		reduced.add(currentReduced);
+		currentReduced = element;
+	}
+
+	protected abstract boolean currentEligible(T next);
 
 	public void evict(int n) {
 		toRemove += n;

http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
index af2239c..bf3ec98 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
@@ -28,14 +28,14 @@ public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	private int windowSize;
-	private int slideSize;
+	private long windowSize;
+	private long slideSize;
 	private TimestampWrapper<T> timestampWrapper;
 	private T lastStored;
 	protected long windowStartTime;
 
 	public SlidingTimePreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
-			int windowSize, int slideSize, TimestampWrapper<T> timestampWrapper) {
+			long windowSize, long slideSize, TimestampWrapper<T> timestampWrapper) {
 		super(reducer, serializer);
 		if (windowSize > slideSize) {
 			this.windowSize = windowSize;
@@ -66,8 +66,7 @@ public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
 	}
 
 	@Override
-	protected void updateIndexAtEmit() {
-		index = 0;
+	protected void afterEmit() {
 		long lastTime = timestampWrapper.getTimestamp(lastStored);
 		if (lastTime - windowStartTime >= slideSize) {
 			windowStartTime = windowStartTime + slideSize;
@@ -92,7 +91,7 @@ public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
 	}
 
 	@Override
-	protected boolean addCurrentToReduce(T next) {
+	protected boolean currentEligible(T next) {
 		return windowStartTime == timestampWrapper.getStartTime()
 				|| timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
index 0b7b51f..bd81c20 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
@@ -72,7 +72,7 @@ public class StreamDiscretizerTest {
 		};
 
 		TriggerPolicy<Integer> trigger = new TimeTriggerPolicy<Integer>(2L,
-				new TimestampWrapper<Integer>(myTimeStamp, 1), 2L);
+				new TimestampWrapper<Integer>(myTimeStamp, 3));
 
 		EvictionPolicy<Integer> eviction = new TimeEvictionPolicy<Integer>(4L,
 				new TimestampWrapper<Integer>(myTimeStamp, 1));


[2/5] flink git commit: [FLINK-1619] [FLINK-1620] Basic sliding prereducers added for Time and Count

Posted by gy...@apache.org.
[FLINK-1619] [FLINK-1620] Basic sliding prereducers added for Time and Count


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70abc16d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70abc16d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70abc16d

Branch: refs/heads/master
Commit: 70abc16df78d47c62b7cc7f1545542b330562113
Parents: 2be00ac
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Mar 4 18:22:13 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Mar 10 16:33:43 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedDataStream.java      |  51 +++-
 .../windowing/policy/CountEvictionPolicy.java   |   8 +
 .../windowing/policy/CountTriggerPolicy.java    |   4 +
 .../windowing/policy/TimeEvictionPolicy.java    |   8 +
 .../api/windowing/policy/TimeTriggerPolicy.java |   4 +
 .../windowbuffer/BasicWindowBuffer.java         |   4 -
 .../windowbuffer/SlidingCountPreReducer.java    |  83 ++++++
 .../windowbuffer/SlidingPreReducer.java         | 141 ++++++++++
 .../windowbuffer/SlidingTimePreReducer.java     |  99 +++++++
 .../windowbuffer/TumblingGroupedPreReducer.java |   7 -
 .../windowbuffer/TumblingPreReducer.java        |   7 -
 .../windowing/windowbuffer/WindowBuffer.java    |   2 -
 .../windowing/WindowIntegrationTest.java        |  37 ++-
 .../windowbuffer/BasicWindowBufferTest.java     |   4 -
 .../SlidingCountPreReducerTest.java             | 216 ++++++++++++++++
 .../windowbuffer/SlidingTimePreReducerTest.java | 257 +++++++++++++++++++
 .../TumblingGroupedPreReducerTest.java          |   7 -
 .../windowbuffer/TumblingPreReducerTest.java    |   6 -
 18 files changed, 894 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 742ff22..4c7da88 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -46,15 +46,21 @@ import org.apache.flink.streaming.api.windowing.WindowEvent;
 import org.apache.flink.streaming.api.windowing.WindowUtils;
 import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
 import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.CompletePreAggregator;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
@@ -365,16 +371,41 @@ public class WindowedDataStream<OUT> {
 	@SuppressWarnings("unchecked")
 	private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
 
-		if (transformation == WindowTransformation.REDUCEWINDOW
-				&& getEviction() instanceof TumblingEvictionPolicy) {
-			if (groupByKey == null) {
-				return new TumblingPreReducer<OUT>(
-						clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
-								.createSerializer(getExecutionConfig()));
-			} else {
-				return new TumblingGroupedPreReducer<OUT>(
-						clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey, getType()
-								.createSerializer(getExecutionConfig()));
+		if (transformation == WindowTransformation.REDUCEWINDOW) {
+			if (getTrigger() instanceof TumblingEvictionPolicy) {
+				if (groupByKey == null) {
+					return new TumblingPreReducer<OUT>(
+							clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
+									.createSerializer(getExecutionConfig()));
+				} else {
+					return new TumblingGroupedPreReducer<OUT>(
+							clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey,
+							getType().createSerializer(getExecutionConfig()));
+				}
+			} else if (getTrigger() instanceof CountTriggerPolicy
+					&& getEviction() instanceof CountEvictionPolicy && groupByKey == null) {
+
+				int slide = ((CountTriggerPolicy<OUT>) getTrigger()).getSlideSize();
+				int window = ((CountEvictionPolicy<OUT>) getEviction()).getWindowSize();
+				int start = ((CountEvictionPolicy<OUT>) getEviction()).getStart();
+				if (slide < window) {
+					return new SlidingCountPreReducer<OUT>(
+							clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+									.getType().createSerializer(getExecutionConfig()), window,
+							slide, start);
+				}
+			} else if (getTrigger() instanceof TimeTriggerPolicy
+					&& getEviction() instanceof TimeEvictionPolicy && groupByKey == null) {
+				int slide = (int) ((TimeTriggerPolicy<OUT>) getTrigger()).getSlideSize();
+				int window = (int) ((TimeEvictionPolicy<OUT>) getEviction()).getWindowSize();
+				TimestampWrapper<OUT> wrapper = ((TimeEvictionPolicy<OUT>) getEviction())
+						.getTimeStampWrapper();
+				if (slide < window) {
+					return new SlidingTimePreReducer<OUT>(
+							clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+									.getType().createSerializer(getExecutionConfig()), window,
+							slide, wrapper);
+				}
 			}
 		}
 		return new BasicWindowBuffer<OUT>();

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
index 3ede27b..07478c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
@@ -136,6 +136,14 @@ public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
 		}
 	}
 
+	public int getWindowSize() {
+		return maxElements;
+	}
+
+	public int getStart() {
+		return startValue;
+	}
+
 	@Override
 	public String toString() {
 		return "CountPolicy(" + maxElements + ")";

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
index 6d8149a..d439f72 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
@@ -102,6 +102,10 @@ public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
 		}
 	}
 
+	public int getSlideSize() {
+		return max;
+	}
+
 	@Override
 	public String toString() {
 		return "CountPolicy(" + max + ")";

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index 982b6d5..ae17e29 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -150,10 +150,18 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 		}
 	}
 
+	public long getWindowSize() {
+		return granularity;
+	}
+
 	@Override
 	public String toString() {
 		return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
 				+ ")";
 	}
 
+	public TimestampWrapper<DATA> getTimeStampWrapper() {
+		return timestampWrapper;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index 7065582..ce8f16e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -213,6 +213,10 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 		}
 	}
 
+	public long getSlideSize() {
+		return granularity;
+	}
+
 	@Override
 	public String toString() {
 		return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
index 458de41..8e39398 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
@@ -63,10 +63,6 @@ public class BasicWindowBuffer<T> implements WindowBuffer<T> {
 		}
 	}
 
-	public int size() {
-		return buffer.size();
-	}
-
 	@Override
 	public BasicWindowBuffer<T> clone() {
 		return new BasicWindowBuffer<T>();

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
new file mode 100644
index 0000000..ff358cb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private int windowSize;
+	private int slideSize;
+	private int start;
+
+	public SlidingCountPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+			int windowSize, int slideSize, int start) {
+		super(reducer, serializer);
+		if (windowSize > slideSize) {
+			this.windowSize = windowSize;
+			this.slideSize = slideSize;
+			this.start = start;
+		} else {
+			throw new RuntimeException(
+					"Window size needs to be larger than slide size for the sliding pre-reducer");
+		}
+		index = index - start;
+	}
+
+	@Override
+	public SlidingCountPreReducer<T> clone() {
+		return new SlidingCountPreReducer<T>(reducer, serializer, windowSize, slideSize, start);
+	}
+
+	@Override
+	public void store(T element) throws Exception {
+		if (index >= 0) {
+			super.store(element);
+		} else {
+			index++;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return currentReduced.toString();
+	}
+
+	@Override
+	protected boolean addCurrentToReduce(T next) {
+		if (index <= slideSize) {
+			return true;
+		} else {
+			return index == windowSize;
+		}
+	}
+
+	@Override
+	protected void updateIndexAtEmit() {
+		if (index >= slideSize) {
+			index = index - slideSize;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
new file mode 100644
index 0000000..a21bb61
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator {
+
+	private static final long serialVersionUID = 1L;
+
+	protected ReduceFunction<T> reducer;
+
+	protected T currentReduced;
+	protected LinkedList<T> reduced = new LinkedList<T>();
+	protected LinkedList<Integer> elementsPerPreAggregate = new LinkedList<Integer>();
+
+	protected TypeSerializer<T> serializer;
+
+	protected int index = 0;
+	protected int toRemove = 0;
+
+	protected int elementsSinceLastPreAggregate = 0;
+
+	public SlidingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
+		this.reducer = reducer;
+		this.serializer = serializer;
+	}
+
+	public boolean emitWindow(Collector<StreamWindow<T>> collector) {
+		StreamWindow<T> currentWindow = new StreamWindow<T>();
+		T finalAggregate = getFinalAggregate();
+		if (finalAggregate != null) {
+			currentWindow.add(finalAggregate);
+			collector.collect(currentWindow);
+			updateIndexAtEmit();
+			return true;
+		} else {
+			updateIndexAtEmit();
+			return false;
+		}
+
+	}
+
+	protected abstract void updateIndexAtEmit();
+
+	public T getFinalAggregate() {
+		try {
+			if (!reduced.isEmpty()) {
+				T finalReduce = reduced.get(0);
+				for (int i = 1; i < reduced.size(); i++) {
+					finalReduce = reducer.reduce(finalReduce, serializer.copy(reduced.get(i)));
+
+				}
+				if (currentReduced != null) {
+					finalReduce = reducer.reduce(finalReduce, serializer.copy(currentReduced));
+				}
+				return finalReduce;
+			} else {
+				return currentReduced;
+			}
+
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	public void store(T element) throws Exception {
+		addToBufferIfEligible(element);
+		index++;
+	}
+
+	protected void addToBufferIfEligible(T element) throws Exception {
+		if (addCurrentToReduce(element) && currentReduced != null) {
+			reduced.add(currentReduced);
+			elementsPerPreAggregate.add(elementsSinceLastPreAggregate);
+			currentReduced = element;
+			elementsSinceLastPreAggregate = 1;
+		} else {
+			if (currentReduced == null) {
+				currentReduced = element;
+			} else {
+				currentReduced = reducer.reduce(serializer.copy(currentReduced), element);
+			}
+			elementsSinceLastPreAggregate++;
+		}
+	}
+
+	protected abstract boolean addCurrentToReduce(T next);
+
+	public void evict(int n) {
+		toRemove += n;
+
+		Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
+		while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
+			toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
+			reduced.removeFirst();
+			lastPreAggregateSize = elementsPerPreAggregate.peek();
+		}
+
+		if (lastPreAggregateSize == null) {
+			toRemove = 0;
+		}
+	}
+
+	public int max(int a, int b) {
+		if (a > b) {
+			return a;
+		} else {
+			return b;
+		}
+	}
+
+	@Override
+	public abstract SlidingPreReducer<T> clone();
+
+	@Override
+	public String toString() {
+		return currentReduced.toString();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
new file mode 100644
index 0000000..af2239c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private int windowSize;
+	private int slideSize;
+	private TimestampWrapper<T> timestampWrapper;
+	private T lastStored;
+	protected long windowStartTime;
+
+	public SlidingTimePreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+			int windowSize, int slideSize, TimestampWrapper<T> timestampWrapper) {
+		super(reducer, serializer);
+		if (windowSize > slideSize) {
+			this.windowSize = windowSize;
+			this.slideSize = slideSize;
+		} else {
+			throw new RuntimeException(
+					"Window size needs to be larger than slide size for the sliding pre-reducer");
+		}
+		this.timestampWrapper = timestampWrapper;
+		this.windowStartTime = timestampWrapper.getStartTime();
+	}
+
+	@Override
+	public void store(T element) throws Exception {
+		super.store(element);
+		lastStored = element;
+	}
+
+	@Override
+	public SlidingTimePreReducer<T> clone() {
+		return new SlidingTimePreReducer<T>(reducer, serializer, windowSize, slideSize,
+				timestampWrapper);
+	}
+
+	@Override
+	public String toString() {
+		return currentReduced.toString();
+	}
+
+	@Override
+	protected void updateIndexAtEmit() {
+		index = 0;
+		long lastTime = timestampWrapper.getTimestamp(lastStored);
+		if (lastTime - windowStartTime >= slideSize) {
+			windowStartTime = windowStartTime + slideSize;
+		}
+	}
+
+	@Override
+	public void evict(int n) {
+		toRemove += n;
+		Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
+
+		while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
+			toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
+			reduced.removeFirst();
+			lastPreAggregateSize = elementsPerPreAggregate.peek();
+		}
+
+		if (toRemove > 0 && lastPreAggregateSize == null) {
+			currentReduced = null;
+			toRemove = 0;
+		}
+	}
+
+	@Override
+	protected boolean addCurrentToReduce(T next) {
+		return windowStartTime == timestampWrapper.getStartTime()
+				|| timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
index 7403ffe..9431a99 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
@@ -38,7 +38,6 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
 
 	private Map<Object, T> reducedValues;
 
-	private int numOfElements = 0;
 	private TypeSerializer<T> serializer;
 
 	public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
@@ -56,7 +55,6 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
 			currentWindow.addAll(reducedValues.values());
 			collector.collect(currentWindow);
 			reducedValues.clear();
-			numOfElements = 0;
 			return true;
 		} else {
 			return false;
@@ -76,16 +74,11 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
 		}
 
 		reducedValues.put(key, reduced);
-		numOfElements++;
 	}
 
 	public void evict(int n) {
 	}
 
-	public int size() {
-		return numOfElements;
-	}
-
 	@Override
 	public TumblingGroupedPreReducer<T> clone() {
 		return new TumblingGroupedPreReducer<T>(reducer, keySelector, serializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
index 35cf60e..58b30a6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
@@ -32,7 +32,6 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
 	private ReduceFunction<T> reducer;
 
 	private T reduced;
-	private int numOfElements = 0;
 	private TypeSerializer<T> serializer;
 
 	public TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
@@ -46,7 +45,6 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
 			currentWindow.add(reduced);
 			collector.collect(currentWindow);
 			reduced = null;
-			numOfElements = 0;
 			return true;
 		} else {
 			return false;
@@ -59,16 +57,11 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
 		} else {
 			reduced = reducer.reduce(serializer.copy(reduced), element);
 		}
-		numOfElements++;
 	}
 
 	public void evict(int n) {
 	}
 
-	public int size() {
-		return numOfElements;
-	}
-
 	@Override
 	public TumblingPreReducer<T> clone() {
 		return new TumblingPreReducer<T>(reducer, serializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
index e0429ab..2dd50db 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
@@ -34,8 +34,6 @@ public interface WindowBuffer<T> extends Serializable, Cloneable {
 
 	public boolean emitWindow(Collector<StreamWindow<T>> collector);
 
-	public int size();
-
 	public WindowBuffer<T> clone();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 25b5f87..bcfa188 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -89,6 +89,8 @@ public class WindowIntegrationTest implements Serializable {
 		inputs.add(11);
 		inputs.add(11);
 
+		KeySelector<Integer, ?> key = new ModKey(2);
+
 		Timestamp<Integer> ts = new Timestamp<Integer>() {
 
 			private static final long serialVersionUID = 1L;
@@ -103,14 +105,12 @@ public class WindowIntegrationTest implements Serializable {
 
 		DataStream<Integer> source = env.fromCollection(inputs);
 
-		source.window(Time.of(2, ts)).every(Time.of(3, ts)).sum(0).getDiscretizedStream()
+		source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
 				.addSink(new CentralSink1());
 
 		source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
 				.flatten().addSink(new CentralSink2());
 
-		KeySelector<Integer, ?> key = new ModKey(2);
-
 		source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
 				.addSink(new DistributedSink1());
 
@@ -126,12 +126,17 @@ public class WindowIntegrationTest implements Serializable {
 		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
 				.addSink(new DistributedSink4());
 
+		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
+				.addSink(new DistributedSink5());
+
 		env.execute();
 
-		// sum ( Time of 2 slide 3 )
+		// sum ( Time of 3 slide 2 )
 		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
 		expected1.add(StreamWindow.fromElements(5));
+		expected1.add(StreamWindow.fromElements(11));
 		expected1.add(StreamWindow.fromElements(9));
+		expected1.add(StreamWindow.fromElements(10));
 		expected1.add(StreamWindow.fromElements(32));
 
 		validateOutput(expected1, CentralSink1.windows);
@@ -193,6 +198,13 @@ public class WindowIntegrationTest implements Serializable {
 
 		validateOutput(expected7, DistributedSink4.windows);
 
+		List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
+		expected8.add(StreamWindow.fromElements(12));
+		expected8.add(StreamWindow.fromElements(9));
+		expected8.add(StreamWindow.fromElements(32));
+
+		validateOutput(expected8, DistributedSink5.windows);
+
 	}
 
 	public static <R> void validateOutput(List<R> expected, List<R> actual) {
@@ -317,4 +329,21 @@ public class WindowIntegrationTest implements Serializable {
 		}
 
 	}
+
+	@SuppressWarnings("serial")
+	private static class DistributedSink5 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+		@Override
+		public void cancel() {
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
index bb756c7..967c719 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
@@ -41,8 +41,6 @@ public class BasicWindowBufferTest {
 		wb.store(2);
 		wb.store(10);
 
-		assertEquals(2, wb.size());
-
 		wb.emitWindow(collector);
 
 		assertEquals(1, collected.size());
@@ -51,8 +49,6 @@ public class BasicWindowBufferTest {
 		wb.store(4);
 		wb.evict(2);
 
-		assertEquals(1, wb.size());
-
 		wb.emitWindow(collector);
 
 		assertEquals(2, collected.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
new file mode 100644
index 0000000..3ce65f1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingCountPreReducerTest {
+
+	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+	ReduceFunction<Integer> reducer = new SumReducer();
+
+	@Test
+	public void testPreReduce1() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+				serializer, 3, 2, 0);
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(3));
+		expected.add(StreamWindow.fromElements(9));
+		expected.add(StreamWindow.fromElements(15));
+		expected.add(StreamWindow.fromElements(21));
+		expected.add(StreamWindow.fromElements(27));
+		expected.add(StreamWindow.fromElements(33));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce2() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+				serializer, 5, 2, 0);
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.emitWindow(collector);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(3));
+		expected.add(StreamWindow.fromElements(10));
+		expected.add(StreamWindow.fromElements(20));
+		expected.add(StreamWindow.fromElements(30));
+		expected.add(StreamWindow.fromElements(40));
+		expected.add(StreamWindow.fromElements(50));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce3() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+				serializer, 6, 3, 0);
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.store(3);
+		preReducer.emitWindow(collector);
+		preReducer.store(4);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.store(9);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(10);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(6));
+		expected.add(StreamWindow.fromElements(21));
+		expected.add(StreamWindow.fromElements(39));
+		expected.add(StreamWindow.fromElements(57));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce4() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+				serializer, 5, 1, 2);
+
+		preReducer.store(1);
+		preReducer.evict(1);
+		preReducer.store(1);
+		preReducer.evict(1);
+		preReducer.store(1);
+		preReducer.emitWindow(collector);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.emitWindow(collector);
+		preReducer.store(4);
+		preReducer.emitWindow(collector);
+		preReducer.store(5);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.store(7);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1));
+		expected.add(StreamWindow.fromElements(3));
+		expected.add(StreamWindow.fromElements(6));
+		expected.add(StreamWindow.fromElements(10));
+		expected.add(StreamWindow.fromElements(15));
+		expected.add(StreamWindow.fromElements(20));
+		expected.add(StreamWindow.fromElements(25));
+		expected.add(StreamWindow.fromElements(30));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	private static class SumReducer implements ReduceFunction<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
new file mode 100644
index 0000000..bc3b13b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingTimePreReducerTest {
+
+	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+	ReduceFunction<Integer> reducer = new SumReducer();
+
+	@Test
+	public void testPreReduce1() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+				serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public long getTimestamp(Integer value) {
+						return value;
+					}
+				}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(3));
+		expected.add(StreamWindow.fromElements(9));
+		expected.add(StreamWindow.fromElements(15));
+		expected.add(StreamWindow.fromElements(21));
+		expected.add(StreamWindow.fromElements(27));
+		expected.add(StreamWindow.fromElements(33));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce2() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+				serializer, 5, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public long getTimestamp(Integer value) {
+						return value;
+					}
+				}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.emitWindow(collector);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(3));
+		expected.add(StreamWindow.fromElements(10));
+		expected.add(StreamWindow.fromElements(20));
+		expected.add(StreamWindow.fromElements(30));
+		expected.add(StreamWindow.fromElements(40));
+		expected.add(StreamWindow.fromElements(50));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce3() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+				serializer, 6, 3, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public long getTimestamp(Integer value) {
+						return value;
+					}
+				}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.store(3);
+		preReducer.emitWindow(collector);
+		preReducer.store(4);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.store(9);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(10);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(6));
+		expected.add(StreamWindow.fromElements(21));
+		expected.add(StreamWindow.fromElements(39));
+		expected.add(StreamWindow.fromElements(57));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce4() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+				serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public long getTimestamp(Integer value) {
+						return value;
+					}
+				}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.emitWindow(collector);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(14);
+		preReducer.emitWindow(collector);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.emitWindow(collector);
+		preReducer.store(21);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+
+		preReducer.store(9);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(3));
+		expected.add(StreamWindow.fromElements(9));
+		expected.add(StreamWindow.fromElements(15));
+		expected.add(StreamWindow.fromElements(21));
+		expected.add(StreamWindow.fromElements(8));
+		expected.add(StreamWindow.fromElements(8));
+		expected.add(StreamWindow.fromElements(14));
+		expected.add(StreamWindow.fromElements(14));
+		expected.add(StreamWindow.fromElements(21));
+
+		assertEquals(expected, collector.getCollected());
+	}
+
+	private static class SumReducer implements ReduceFunction<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
index 437fcd6..95aace0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
@@ -67,9 +67,6 @@ public class TumblingGroupedPreReducerTest {
 
 		wb.store(serializer.copy(inputs.get(0)));
 		wb.store(serializer.copy(inputs.get(1)));
-
-		assertEquals(2, wb.size());
-
 		wb.emitWindow(collector);
 
 		assertEquals(1, collected.size());
@@ -77,9 +74,6 @@ public class TumblingGroupedPreReducerTest {
 		assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(1, 1),
 				new Tuple2<Integer, Integer>(0, 0)), collected.get(0));
 
-		// Test automatic eviction
-		assertEquals(0, wb.size());
-
 		wb.store(serializer.copy(inputs.get(0)));
 		wb.store(serializer.copy(inputs.get(1)));
 		wb.store(serializer.copy(inputs.get(2)));
@@ -89,7 +83,6 @@ public class TumblingGroupedPreReducerTest {
 
 		wb.store(serializer.copy(inputs.get(3)));
 
-		assertEquals(4, wb.size());
 		wb.emitWindow(collector);
 
 		assertEquals(2, collected.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
index 31be227..ddaf747 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
@@ -58,17 +58,12 @@ public class TumblingPreReducerTest {
 		wb.store(serializer.copy(inputs.get(0)));
 		wb.store(serializer.copy(inputs.get(1)));
 
-		assertEquals(2, wb.size());
-
 		wb.emitWindow(collector);
 
 		assertEquals(1, collected.size());
 		assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, 1)),
 				collected.get(0));
 
-		// Test automatic eviction
-		assertEquals(0, wb.size());
-
 		wb.store(serializer.copy(inputs.get(0)));
 		wb.store(serializer.copy(inputs.get(1)));
 		wb.store(serializer.copy(inputs.get(2)));
@@ -78,7 +73,6 @@ public class TumblingPreReducerTest {
 
 		wb.store(serializer.copy(inputs.get(3)));
 
-		assertEquals(4, wb.size());
 		wb.emitWindow(collector);
 
 		assertEquals(2, collected.size());


[3/5] flink git commit: [FLINK-1660] [streaming] Increased timeout for MultiTriggerPolicyTest and introduced a constant representing it.

Posted by gy...@apache.org.
[FLINK-1660] [streaming] Increased timeout for MultiTriggerPolicyTest and introduced a constant representing it.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f09c0af2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f09c0af2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f09c0af2

Branch: refs/heads/master
Commit: f09c0af2f9a2f7844ccef0983699782c594588ca
Parents: 2bba2b3
Author: Jonas Traub (powibol) <jo...@s-traub.com>
Authored: Tue Mar 10 09:20:04 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Mar 10 16:33:43 2015 +0100

----------------------------------------------------------------------
 .../api/windowing/policy/MultiTriggerPolicyTest.java   | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f09c0af2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
index de6cdcb..9964cd8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
@@ -31,6 +31,15 @@ import org.junit.Test;
 public class MultiTriggerPolicyTest {
 
 	/**
+	 * This constant defines the timeout for the test of the start ups of the
+	 * active trigger policy Threads.
+	 */
+	private static final int TIMEOUT = 120000;
+
+	// Use this to increase the timeout to be as long as possible.
+	// private static final int TIMEOUT=Integer.MAX_VALUE;
+
+	/**
 	 * This test covers all regular notify call. It takes no fake elements into
 	 * account.
 	 */
@@ -138,8 +147,8 @@ public class MultiTriggerPolicyTest {
 		Runnable runnable = multiTrigger.createActiveTriggerRunnable(cb);
 		new Thread(runnable).start();
 
-		assertTrue("Even after 10000ms not all active policy runnables were started.",
-				cb.check(10000, 1, 2, 3));
+		assertTrue("Even after " + TIMEOUT + "ms not all active policy runnables were started.",
+				cb.check(TIMEOUT, 1, 2, 3));
 	}
 
 	private void arrayEqualityCheck(Object[] array1, Object[] array2) {


[5/5] flink git commit: [FLINK-1619] [FLINK-1620] Grouped sliding prereducers added for Time and Count

Posted by gy...@apache.org.
[FLINK-1619] [FLINK-1620] Grouped sliding prereducers added for Time and Count

Closes #465


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2522f028
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2522f028
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2522f028

Branch: refs/heads/master
Commit: 2522f028bde9af57ba52904855265e6a8519e724
Parents: aacd4f2
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Mar 9 13:43:32 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Mar 11 01:22:10 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/DataStreamSource.java        |   3 +
 .../api/datastream/WindowedDataStream.java      |  50 ++--
 .../streaming/api/windowing/WindowUtils.java    |   7 +-
 .../SlidingCountGroupedPreReducer.java          |  86 ++++++
 .../windowbuffer/SlidingGroupedPreReducer.java  | 148 ++++++++++
 .../windowbuffer/SlidingPreReducer.java         |  31 +-
 .../SlidingTimeGroupedPreReducer.java           | 100 +++++++
 .../windowing/WindowIntegrationTest.java        | 155 +++++-----
 .../SlidingCountGroupedPreReducerTest.java      | 220 +++++++++++++++
 .../SlidingTimeGroupedPreReducerTest.java       | 280 +++++++++++++++++++
 10 files changed, 979 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index b596cbd..0dda976 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -35,6 +35,9 @@ public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataS
 			TypeInformation<OUT> outTypeInfo, StreamInvokable<?, ?> invokable, boolean isParallel) {
 		super(environment, operatorType, outTypeInfo, invokable);
 		this.isParallel = isParallel;
+		if (!isParallel) {
+			setParallelism(1);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 6ce9f9f..8199b22 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -56,7 +56,9 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.CompletePreAggregator;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
@@ -337,8 +339,10 @@ public class WindowedDataStream<OUT> {
 	}
 
 	private int getDiscretizerParallelism() {
-		return isLocal || WindowUtils.isParallelPolicy(getTrigger(), getEviction())
-				|| (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
+		return isLocal
+				|| WindowUtils.isParallelPolicy(getTrigger(), getEviction(),
+						dataStream.getParallelism()) || (discretizerKey != null) ? dataStream.environment
+				.getDegreeOfParallelism() : 1;
 	}
 
 	private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {
@@ -381,21 +385,35 @@ public class WindowedDataStream<OUT> {
 							(ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
 									.createSerializer(getExecutionConfig()));
 				}
-			} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction) && groupByKey == null) {
-
-				return new SlidingCountPreReducer<OUT>(
-						clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream.getType()
-								.createSerializer(getExecutionConfig()),
-						WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
-						((CountTriggerPolicy<?>) trigger).getStart());
-
-			} else if (WindowUtils.isSlidingTimePolicy(trigger, eviction) && groupByKey == null) {
+			} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
+				if (groupByKey == null) {
+					return new SlidingCountPreReducer<OUT>(
+							clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+									.getType().createSerializer(getExecutionConfig()),
+							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+							((CountTriggerPolicy<?>) trigger).getStart());
+				} else {
+					return new SlidingCountGroupedPreReducer<OUT>(
+							clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+									.getType().createSerializer(getExecutionConfig()), groupByKey,
+							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+							((CountTriggerPolicy<?>) trigger).getStart());
+				}
 
-				return new SlidingTimePreReducer<OUT>(
-						(ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
-								.createSerializer(getExecutionConfig()),
-						WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
-						WindowUtils.getTimeStampWrapper(trigger));
+			} else if (WindowUtils.isSlidingTimePolicy(trigger, eviction)) {
+				if (groupByKey == null) {
+					return new SlidingTimePreReducer<OUT>(
+							(ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+									.createSerializer(getExecutionConfig()),
+							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+							WindowUtils.getTimeStampWrapper(trigger));
+				} else {
+					return new SlidingTimeGroupedPreReducer<OUT>(
+							(ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+									.createSerializer(getExecutionConfig()), groupByKey,
+							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+							WindowUtils.getTimeStampWrapper(trigger));
+				}
 
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
index 246aff2..0649b4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -44,9 +44,10 @@ public class WindowUtils {
 		}
 	}
 
-	public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
-		return (eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy))
-				|| (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy);
+	public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction,
+			int inputParallelism) {
+		return inputParallelism != 1
+				&& ((eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy)) || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy));
 	}
 
 	public static boolean isSlidingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
new file mode 100644
index 0000000..48bf1b6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingCountGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private long windowSize;
+	private long slideSize;
+	private int start;
+
+	protected long index = 0;
+
+	public SlidingCountGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+			KeySelector<T, ?> key, long windowSize, long slideSize, int start) {
+		super(reducer, serializer, key);
+		if (windowSize > slideSize) {
+			this.windowSize = windowSize;
+			this.slideSize = slideSize;
+			this.start = start;
+		} else {
+			throw new RuntimeException(
+					"Window size needs to be larger than slide size for the sliding pre-reducer");
+		}
+		index = index - start;
+	}
+
+	@Override
+	protected void afterStore() {
+		index++;
+	}
+
+	@Override
+	public void store(T element) throws Exception {
+		if (index >= 0) {
+			super.store(element);
+		} else {
+			index++;
+		}
+	}
+
+	@Override
+	protected boolean currentEligible(T next) {
+		if (index <= slideSize) {
+			return true;
+		} else {
+			return index == windowSize;
+		}
+	}
+
+	@Override
+	protected void afterEmit() {
+		if (index >= slideSize) {
+			index = index - slideSize;
+		}
+	}
+
+	@Override
+	public SlidingCountGroupedPreReducer<T> clone() {
+		return new SlidingCountGroupedPreReducer<T>(reducer, serializer, key, windowSize,
+				slideSize, start);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
new file mode 100644
index 0000000..aa1d76c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+public abstract class SlidingGroupedPreReducer<T> extends SlidingPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	protected Map<Object, T> currentReducedMap = new HashMap<Object, T>();
+	protected LinkedList<Map<Object, T>> reducedMap = new LinkedList<Map<Object, T>>();
+
+	protected KeySelector<T, ?> key;
+
+	public SlidingGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+			KeySelector<T, ?> key) {
+		super(reducer, serializer);
+		this.key = key;
+	}
+
+	public boolean addFinalAggregate(StreamWindow<T> currentWindow) throws Exception {
+		Map<Object, T> finalReduce = null;
+
+		if (!reducedMap.isEmpty()) {
+			finalReduce = reducedMap.get(0);
+			for (int i = 1; i < reducedMap.size(); i++) {
+				finalReduce = reduceMaps(finalReduce, reducedMap.get(i));
+
+			}
+			if (currentReducedMap != null) {
+				finalReduce = reduceMaps(finalReduce, currentReducedMap);
+			}
+
+		} else {
+			finalReduce = currentReducedMap;
+		}
+
+		if (finalReduce != null) {
+			currentWindow.addAll(finalReduce.values());
+			return true;
+		} else {
+			return false;
+		}
+
+	}
+
+	private Map<Object, T> reduceMaps(Map<Object, T> first, Map<Object, T> second) throws Exception {
+
+		Map<Object, T> reduced = new HashMap<Object, T>();
+
+		// Get the common keys in the maps
+		Set<Object> interSection = new HashSet<Object>();
+		Set<Object> diffFirst = new HashSet<Object>();
+		Set<Object> diffSecond = new HashSet<Object>();
+
+		for (Object key : first.keySet()) {
+			if (second.containsKey(key)) {
+				interSection.add(key);
+			} else {
+				diffFirst.add(key);
+			}
+		}
+
+		for (Object key : second.keySet()) {
+			if (!interSection.contains(key)) {
+				diffSecond.add(key);
+			}
+		}
+
+		// Reduce the common keys
+		for (Object key : interSection) {
+			reduced.put(
+					key,
+					reducer.reduce(serializer.copy(first.get(key)),
+							serializer.copy(second.get(key))));
+		}
+
+		for (Object key : diffFirst) {
+			reduced.put(key, first.get(key));
+		}
+
+		for (Object key : diffSecond) {
+			reduced.put(key, second.get(key));
+		}
+
+		return reduced;
+	}
+
+	protected void updateCurrent(T element) throws Exception {
+		if (currentReducedMap == null) {
+			currentReducedMap = new HashMap<Object, T>();
+			currentReducedMap.put(key.getKey(element), element);
+		} else {
+			Object nextKey = key.getKey(element);
+			T last = currentReducedMap.get(nextKey);
+			if (last == null) {
+				currentReducedMap.put(nextKey, element);
+			} else {
+				currentReducedMap.put(nextKey, reducer.reduce(serializer.copy(last), element));
+			}
+		}
+	}
+
+	@Override
+	protected void removeLastReduced() {
+		reducedMap.removeFirst();
+	}
+
+	@Override
+	protected void addCurrentToBuffer(T element) throws Exception {
+		reducedMap.add(currentReducedMap);
+	}
+
+	@Override
+	protected void resetCurrent() {
+		currentReducedMap = null;
+	}
+
+	@Override
+	protected boolean currentNotEmpty() {
+		return currentReducedMap != null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
index 1dd126d..27f7ff5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -103,15 +103,23 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
 	}
 
 	protected void addToBufferIfEligible(T element) throws Exception {
-		if (currentEligible(element) && currentReduced != null) {
+		if (currentEligible(element) && currentNotEmpty()) {
 			addCurrentToBuffer(element);
 			elementsPerPreAggregate.add(elementsSinceLastPreAggregate);
-			elementsSinceLastPreAggregate = 1;
-		} else {
-			updateCurrent(element);
-
-			elementsSinceLastPreAggregate++;
+			elementsSinceLastPreAggregate = 0;
+			resetCurrent();
 		}
+		updateCurrent(element);
+
+		elementsSinceLastPreAggregate++;
+	}
+
+	protected void resetCurrent() {
+		currentReduced = null;
+	}
+
+	protected boolean currentNotEmpty() {
+		return currentReduced != null;
 	}
 
 	protected void updateCurrent(T element) throws Exception {
@@ -122,9 +130,8 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
 		}
 	}
 
-	protected void addCurrentToBuffer(T element) {
+	protected void addCurrentToBuffer(T element) throws Exception {
 		reduced.add(currentReduced);
-		currentReduced = element;
 	}
 
 	protected abstract boolean currentEligible(T next);
@@ -135,7 +142,7 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
 		Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
 		while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
 			toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
-			reduced.removeFirst();
+			removeLastReduced();
 			lastPreAggregateSize = elementsPerPreAggregate.peek();
 		}
 
@@ -144,7 +151,11 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
 		}
 	}
 
-	public int max(int a, int b) {
+	protected void removeLastReduced() {
+		reduced.removeFirst();
+	}
+
+	public static int max(int a, int b) {
 		if (a > b) {
 			return a;
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
new file mode 100644
index 0000000..1c293af
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingTimeGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private long windowSize;
+	private long slideSize;
+	private TimestampWrapper<T> timestampWrapper;
+	private T lastStored;
+	protected long windowStartTime;
+
+	public SlidingTimeGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+			KeySelector<T, ?> key, long windowSize, long slideSize,
+			TimestampWrapper<T> timestampWrapper) {
+		super(reducer, serializer, key);
+		if (windowSize > slideSize) {
+			this.windowSize = windowSize;
+			this.slideSize = slideSize;
+		} else {
+			throw new RuntimeException(
+					"Window size needs to be larger than slide size for the sliding pre-reducer");
+		}
+		this.timestampWrapper = timestampWrapper;
+		this.windowStartTime = timestampWrapper.getStartTime();
+	}
+
+	@Override
+	public void store(T element) throws Exception {
+		super.store(element);
+		lastStored = element;
+	}
+
+	@Override
+	public SlidingTimeGroupedPreReducer<T> clone() {
+		return new SlidingTimeGroupedPreReducer<T>(reducer, serializer, key, windowSize, slideSize,
+				timestampWrapper);
+	}
+
+	@Override
+	public String toString() {
+		return currentReducedMap.toString();
+	}
+
+	@Override
+	protected void afterEmit() {
+		long lastTime = timestampWrapper.getTimestamp(lastStored);
+		if (lastTime - windowStartTime >= slideSize) {
+			windowStartTime = windowStartTime + slideSize;
+		}
+	}
+
+	@Override
+	public void evict(int n) {
+		toRemove += n;
+		Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
+
+		while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
+			toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
+			removeLastReduced();
+			lastPreAggregateSize = elementsPerPreAggregate.peek();
+		}
+
+		if (toRemove > 0 && lastPreAggregateSize == null) {
+			resetCurrent();
+			toRemove = 0;
+		}
+	}
+
+	@Override
+	protected boolean currentEligible(T next) {
+		return windowStartTime == timestampWrapper.getStartTime()
+				|| timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index bcfa188..d7338a0 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -44,7 +44,7 @@ public class WindowIntegrationTest implements Serializable {
 	private static final Integer MEMORYSIZE = 32;
 
 	@SuppressWarnings("serial")
-	private static class ModKey implements KeySelector<Integer, Integer> {
+	public static class ModKey implements KeySelector<Integer, Integer> {
 		private int m;
 
 		public ModKey(int m) {
@@ -126,82 +126,93 @@ public class WindowIntegrationTest implements Serializable {
 		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
 				.addSink(new DistributedSink4());
 
-		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
-				.addSink(new DistributedSink5());
+		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
+				.getDiscretizedStream().addSink(new DistributedSink5());
 
 		env.execute();
 
-		// sum ( Time of 3 slide 2 )
-		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
-		expected1.add(StreamWindow.fromElements(5));
-		expected1.add(StreamWindow.fromElements(11));
-		expected1.add(StreamWindow.fromElements(9));
-		expected1.add(StreamWindow.fromElements(10));
-		expected1.add(StreamWindow.fromElements(32));
-
-		validateOutput(expected1, CentralSink1.windows);
-
-		// Tumbling Time of 4 grouped by mod 2
-		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
-		expected2.add(StreamWindow.fromElements(2, 2, 4));
-		expected2.add(StreamWindow.fromElements(1, 3));
-		expected2.add(StreamWindow.fromElements(5));
-		expected2.add(StreamWindow.fromElements(10));
-		expected2.add(StreamWindow.fromElements(11, 11));
-
-		validateOutput(expected2, CentralSink2.windows);
-
-		// groupby mod 2 sum ( Tumbling Time of 4)
-		List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
-		expected3.add(StreamWindow.fromElements(4));
-		expected3.add(StreamWindow.fromElements(5));
-		expected3.add(StreamWindow.fromElements(22));
-		expected3.add(StreamWindow.fromElements(8));
-		expected3.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected3, DistributedSink1.windows);
-
-		// groupby mod3 Tumbling Count of 2 grouped by mod 2
-		List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
-		expected4.add(StreamWindow.fromElements(2, 2));
-		expected4.add(StreamWindow.fromElements(1));
-		expected4.add(StreamWindow.fromElements(4));
-		expected4.add(StreamWindow.fromElements(5, 11));
-		expected4.add(StreamWindow.fromElements(10));
-		expected4.add(StreamWindow.fromElements(11));
-		expected4.add(StreamWindow.fromElements(3));
-
-		validateOutput(expected4, DistributedSink2.windows);
-
-		// min ( Time of 2 slide 3 )
-		List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
-		expected5.add(StreamWindow.fromElements(1));
-		expected5.add(StreamWindow.fromElements(4));
-		expected5.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected5, CentralSink3.windows);
-
-		// groupby mod 2 max ( Tumbling Time of 4)
-		List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
-		expected6.add(StreamWindow.fromElements(3));
-		expected6.add(StreamWindow.fromElements(5));
-		expected6.add(StreamWindow.fromElements(11));
-		expected6.add(StreamWindow.fromElements(4));
-		expected6.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected6, DistributedSink3.windows);
-
-		List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
-		expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
-		expected7.add(StreamWindow.fromElements(10));
-		expected7.add(StreamWindow.fromElements(10, 11, 11));
-
-		validateOutput(expected7, DistributedSink4.windows);
+		 // sum ( Time of 3 slide 2 )
+		 List<StreamWindow<Integer>> expected1 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected1.add(StreamWindow.fromElements(5));
+		 expected1.add(StreamWindow.fromElements(11));
+		 expected1.add(StreamWindow.fromElements(9));
+		 expected1.add(StreamWindow.fromElements(10));
+		 expected1.add(StreamWindow.fromElements(32));
+		
+		 validateOutput(expected1, CentralSink1.windows);
+		
+		 // Tumbling Time of 4 grouped by mod 2
+		 List<StreamWindow<Integer>> expected2 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected2.add(StreamWindow.fromElements(2, 2, 4));
+		 expected2.add(StreamWindow.fromElements(1, 3));
+		 expected2.add(StreamWindow.fromElements(5));
+		 expected2.add(StreamWindow.fromElements(10));
+		 expected2.add(StreamWindow.fromElements(11, 11));
+		
+		 validateOutput(expected2, CentralSink2.windows);
+		
+		 // groupby mod 2 sum ( Tumbling Time of 4)
+		 List<StreamWindow<Integer>> expected3 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected3.add(StreamWindow.fromElements(4));
+		 expected3.add(StreamWindow.fromElements(5));
+		 expected3.add(StreamWindow.fromElements(22));
+		 expected3.add(StreamWindow.fromElements(8));
+		 expected3.add(StreamWindow.fromElements(10));
+		
+		 validateOutput(expected3, DistributedSink1.windows);
+		
+		 // groupby mod3 Tumbling Count of 2 grouped by mod 2
+		 List<StreamWindow<Integer>> expected4 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected4.add(StreamWindow.fromElements(2, 2));
+		 expected4.add(StreamWindow.fromElements(1));
+		 expected4.add(StreamWindow.fromElements(4));
+		 expected4.add(StreamWindow.fromElements(5, 11));
+		 expected4.add(StreamWindow.fromElements(10));
+		 expected4.add(StreamWindow.fromElements(11));
+		 expected4.add(StreamWindow.fromElements(3));
+		
+		 validateOutput(expected4, DistributedSink2.windows);
+		
+		 // min ( Time of 2 slide 3 )
+		 List<StreamWindow<Integer>> expected5 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected5.add(StreamWindow.fromElements(1));
+		 expected5.add(StreamWindow.fromElements(4));
+		 expected5.add(StreamWindow.fromElements(10));
+		
+		 validateOutput(expected5, CentralSink3.windows);
+		
+		 // groupby mod 2 max ( Tumbling Time of 4)
+		 List<StreamWindow<Integer>> expected6 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected6.add(StreamWindow.fromElements(3));
+		 expected6.add(StreamWindow.fromElements(5));
+		 expected6.add(StreamWindow.fromElements(11));
+		 expected6.add(StreamWindow.fromElements(4));
+		 expected6.add(StreamWindow.fromElements(10));
+		
+		 validateOutput(expected6, DistributedSink3.windows);
+		
+		 List<StreamWindow<Integer>> expected7 = new
+		 ArrayList<StreamWindow<Integer>>();
+		 expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
+		 expected7.add(StreamWindow.fromElements(10));
+		 expected7.add(StreamWindow.fromElements(10, 11, 11));
+		
+		 validateOutput(expected7, DistributedSink4.windows);
 
 		List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
-		expected8.add(StreamWindow.fromElements(12));
-		expected8.add(StreamWindow.fromElements(9));
-		expected8.add(StreamWindow.fromElements(32));
+		expected8.add(StreamWindow.fromElements(4, 8));
+		expected8.add(StreamWindow.fromElements(4, 5));
+		expected8.add(StreamWindow.fromElements(10, 22));
+
+		for (List<Integer> sw : DistributedSink5.windows) {
+			Collections.sort(sw);
+		}
 
 		validateOutput(expected8, DistributedSink5.windows);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
new file mode 100644
index 0000000..4e63f89
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducerTest.checkResults;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingCountGroupedPreReducerTest {
+
+	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+	ReduceFunction<Integer> reducer = new SumReducer();
+
+	KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+
+	@Test
+	public void testPreReduce1() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+				reducer, serializer, key, 3, 2, 0);
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1, 2));
+		expected.add(StreamWindow.fromElements(3, 6));
+		expected.add(StreamWindow.fromElements(5, 10));
+		expected.add(StreamWindow.fromElements(7, 14));
+		expected.add(StreamWindow.fromElements(9, 18));
+		expected.add(StreamWindow.fromElements(11, 22));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce2() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+				reducer, serializer, key, 5, 2, 0);
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.emitWindow(collector);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1, 2));
+		expected.add(StreamWindow.fromElements(4, 6));
+		expected.add(StreamWindow.fromElements(12, 8));
+		expected.add(StreamWindow.fromElements(18, 12));
+		expected.add(StreamWindow.fromElements(24, 16));
+		expected.add(StreamWindow.fromElements(30, 20));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce3() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+				reducer, serializer, key, 6, 3, 0);
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.store(3);
+		preReducer.emitWindow(collector);
+		preReducer.store(4);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.store(9);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(10);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(2, 4));
+		expected.add(StreamWindow.fromElements(9, 12));
+		expected.add(StreamWindow.fromElements(21, 18));
+		expected.add(StreamWindow.fromElements(30, 27));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce4() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+				reducer, serializer, key, 5, 1, 2);
+
+		preReducer.store(1);
+		preReducer.evict(1);
+		preReducer.store(1);
+		preReducer.evict(1);
+		preReducer.store(1);
+		preReducer.emitWindow(collector);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.emitWindow(collector);
+		preReducer.store(4);
+		preReducer.emitWindow(collector);
+		preReducer.store(5);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.store(7);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1));
+		expected.add(StreamWindow.fromElements(1, 2));
+		expected.add(StreamWindow.fromElements(4, 2));
+		expected.add(StreamWindow.fromElements(4, 6));
+		expected.add(StreamWindow.fromElements(9, 6));
+		expected.add(StreamWindow.fromElements(8, 12));
+		expected.add(StreamWindow.fromElements(15, 10));
+		expected.add(StreamWindow.fromElements(12, 18));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	private static class SumReducer implements ReduceFunction<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
new file mode 100644
index 0000000..83ad7ed
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingTimeGroupedPreReducerTest {
+
+	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+	ReduceFunction<Integer> reducer = new SumReducer();
+
+	KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+
+	@Test
+	public void testPreReduce1() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+				reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(
+						new Timestamp<Integer>() {
+
+							private static final long serialVersionUID = 1L;
+
+							@Override
+							public long getTimestamp(Integer value) {
+								return value;
+							}
+						}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1, 2));
+		expected.add(StreamWindow.fromElements(3, 6));
+		expected.add(StreamWindow.fromElements(5, 10));
+		expected.add(StreamWindow.fromElements(7, 14));
+		expected.add(StreamWindow.fromElements(9, 18));
+		expected.add(StreamWindow.fromElements(11, 22));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	protected static void checkResults(List<StreamWindow<Integer>> expected,
+			List<StreamWindow<Integer>> actual) {
+
+		for (StreamWindow<Integer> sw : expected) {
+			Collections.sort(sw);
+		}
+
+		for (StreamWindow<Integer> sw : actual) {
+			Collections.sort(sw);
+		}
+
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testPreReduce2() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+				reducer, serializer, key, 5, 2, new TimestampWrapper<Integer>(
+						new Timestamp<Integer>() {
+
+							private static final long serialVersionUID = 1L;
+
+							@Override
+							public long getTimestamp(Integer value) {
+								return value;
+							}
+						}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.emitWindow(collector);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(9);
+		preReducer.store(10);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1, 2));
+		expected.add(StreamWindow.fromElements(4, 6));
+		expected.add(StreamWindow.fromElements(12, 8));
+		expected.add(StreamWindow.fromElements(18, 12));
+		expected.add(StreamWindow.fromElements(24, 16));
+		expected.add(StreamWindow.fromElements(30, 20));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce3() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+				reducer, serializer, key, 6, 3, new TimestampWrapper<Integer>(
+						new Timestamp<Integer>() {
+
+							private static final long serialVersionUID = 1L;
+
+							@Override
+							public long getTimestamp(Integer value) {
+								return value;
+							}
+						}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.store(3);
+		preReducer.emitWindow(collector);
+		preReducer.store(4);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.store(9);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(10);
+		preReducer.store(11);
+		preReducer.store(12);
+		preReducer.emitWindow(collector);
+		preReducer.evict(3);
+		preReducer.store(13);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(2, 4));
+		expected.add(StreamWindow.fromElements(9, 12));
+		expected.add(StreamWindow.fromElements(21, 18));
+		expected.add(StreamWindow.fromElements(30, 27));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	@Test
+	public void testPreReduce4() throws Exception {
+		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+				reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(
+						new Timestamp<Integer>() {
+
+							private static final long serialVersionUID = 1L;
+
+							@Override
+							public long getTimestamp(Integer value) {
+								return value;
+							}
+						}, 1));
+
+		preReducer.store(1);
+		preReducer.store(2);
+		preReducer.emitWindow(collector);
+		preReducer.store(3);
+		preReducer.store(4);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(5);
+		preReducer.store(6);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(7);
+		preReducer.store(8);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.emitWindow(collector);
+		preReducer.emitWindow(collector);
+		preReducer.evict(2);
+		preReducer.store(14);
+		preReducer.emitWindow(collector);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+		preReducer.emitWindow(collector);
+		preReducer.store(21);
+		preReducer.emitWindow(collector);
+		preReducer.evict(1);
+		preReducer.emitWindow(collector);
+
+		preReducer.store(9);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(1, 2));
+		expected.add(StreamWindow.fromElements(3, 6));
+		expected.add(StreamWindow.fromElements(5, 10));
+		expected.add(StreamWindow.fromElements(7, 14));
+		expected.add(StreamWindow.fromElements(8));
+		expected.add(StreamWindow.fromElements(8));
+		expected.add(StreamWindow.fromElements(14));
+		expected.add(StreamWindow.fromElements(14));
+		expected.add(StreamWindow.fromElements(21));
+
+		checkResults(expected, collector.getCollected());
+	}
+
+	private static class SumReducer implements ReduceFunction<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+
+	}
+}


[4/5] flink git commit: [FLINK-1657] [streaming] Count window parallel discretization

Posted by gy...@apache.org.
[FLINK-1657] [streaming] Count window parallel discretization


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2be00ac7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2be00ac7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2be00ac7

Branch: refs/heads/master
Commit: 2be00ac7aca75c2fb9b66a554873b59970bfa21c
Parents: f09c0af
Author: Gyula Fora <gy...@apache.org>
Authored: Sat Mar 7 20:07:46 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Mar 10 16:33:43 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/DiscretizedStream.java       |  2 +
 .../api/datastream/WindowedDataStream.java      | 70 ++++++--------------
 .../windowing/WindowBufferInvokable.java        |  3 +
 .../streaming/api/windowing/WindowUtils.java    | 70 ++++++++++++++++++++
 .../windowing/WindowIntegrationTest.java        | 54 +++++++--------
 5 files changed, 124 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index dc2e987..451acf0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -35,6 +35,8 @@ import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartiti
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
+import org.apache.flink.streaming.api.windowing.WindowUtils.WindowKey;
+import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
 
 /**
  * A {@link DiscretizedStream} represents a data stream that has been divided

http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 12ef0e6..742ff22 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
@@ -44,6 +43,8 @@ import org.apache.flink.streaming.api.invokable.operator.windowing.WindowBufferI
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
 import org.apache.flink.streaming.api.windowing.WindowEvent;
+import org.apache.flink.streaming.api.windowing.WindowUtils;
+import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
@@ -71,16 +72,6 @@ import org.apache.flink.streaming.util.keys.KeySelectorUtil;
  */
 public class WindowedDataStream<OUT> {
 
-	protected enum WindowTransformation {
-		REDUCEWINDOW, MAPWINDOW, NONE;
-		private Function UDF;
-
-		public WindowTransformation with(Function UDF) {
-			this.UDF = UDF;
-			return this;
-		}
-	}
-
 	protected DataStream<OUT> dataStream;
 
 	protected boolean isLocal = false;
@@ -266,8 +257,7 @@ public class WindowedDataStream<OUT> {
 		WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
 				.with(reduceFunction);
 
-		WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation, getTrigger(),
-				getEviction(), discretizerKey);
+		WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation);
 
 		DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
 
@@ -324,11 +314,9 @@ public class WindowedDataStream<OUT> {
 	private DiscretizedStream<OUT> discretize(WindowTransformation transformation,
 			WindowBuffer<OUT> windowBuffer) {
 
-		StreamInvokable<OUT, WindowEvent<OUT>> discretizer = getDiscretizer(getTrigger(),
-				getEviction(), discretizerKey);
+		StreamInvokable<OUT, WindowEvent<OUT>> discretizer = getDiscretizer();
 
-		StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> bufferInvokable = getBufferInvokable(
-				windowBuffer, discretizerKey);
+		StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> bufferInvokable = getBufferInvokable(windowBuffer);
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		TypeInformation<WindowEvent<OUT>> bufferEventType = new TupleTypeInfo(WindowEvent.class,
@@ -346,30 +334,27 @@ public class WindowedDataStream<OUT> {
 	}
 
 	private int getDiscretizerParallelism() {
-		return isLocal || (discretizerKey != null) ? dataStream.environment
-				.getDegreeOfParallelism() : 1;
+		return isLocal || WindowUtils.isParallelPolicy(getTrigger(), getEviction())
+				|| (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
 	}
 
-	private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer(TriggerPolicy<OUT> trigger,
-			EvictionPolicy<OUT> eviction, KeySelector<OUT, ?> discretizerKey) {
-
+	private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {
 		if (discretizerKey == null) {
-			return new StreamDiscretizer<OUT>(trigger, eviction);
-		} else if (trigger instanceof TimeTriggerPolicy
-				&& ((TimeTriggerPolicy<OUT>) trigger).timestampWrapper.isDefaultTimestamp()) {
+			return new StreamDiscretizer<OUT>(getTrigger(), getEviction());
+		} else if (WindowUtils.isSystemTimeTrigger(getTrigger())) {
 			return new GroupedTimeDiscretizer<OUT>(discretizerKey,
-					(TimeTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction);
-		}
-
-		else {
+					(TimeTriggerPolicy<OUT>) getTrigger(),
+					(CloneableEvictionPolicy<OUT>) getEviction());
+		} else {
 			return new GroupedStreamDiscretizer<OUT>(discretizerKey,
-					(CloneableTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction);
+					(CloneableTriggerPolicy<OUT>) getTrigger(),
+					(CloneableEvictionPolicy<OUT>) getEviction());
 		}
 
 	}
 
 	private StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> getBufferInvokable(
-			WindowBuffer<OUT> windowBuffer, KeySelector<OUT, ?> discretizerKey) {
+			WindowBuffer<OUT> windowBuffer) {
 		if (discretizerKey == null) {
 			return new WindowBufferInvokable<OUT>(windowBuffer);
 		} else {
@@ -378,20 +363,18 @@ public class WindowedDataStream<OUT> {
 	}
 
 	@SuppressWarnings("unchecked")
-	private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation,
-			TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> eviction,
-			KeySelector<OUT, ?> discretizerKey) {
+	private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
 
 		if (transformation == WindowTransformation.REDUCEWINDOW
-				&& eviction instanceof TumblingEvictionPolicy) {
+				&& getEviction() instanceof TumblingEvictionPolicy) {
 			if (groupByKey == null) {
 				return new TumblingPreReducer<OUT>(
-						clean((ReduceFunction<OUT>) transformation.UDF), getType()
+						clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
 								.createSerializer(getExecutionConfig()));
 			} else {
 				return new TumblingGroupedPreReducer<OUT>(
-						clean((ReduceFunction<OUT>) transformation.UDF), groupByKey,
-						getType().createSerializer(getExecutionConfig()));
+						clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey, getType()
+								.createSerializer(getExecutionConfig()));
 			}
 		}
 		return new BasicWindowBuffer<OUT>();
@@ -678,15 +661,4 @@ public class WindowedDataStream<OUT> {
 	protected WindowedDataStream<OUT> copy() {
 		return new WindowedDataStream<OUT>(this);
 	}
-
-	protected static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer getKey(StreamWindow<R> value) throws Exception {
-			return value.windowID;
-		}
-
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
index 75f7d9d..72be823 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
@@ -52,10 +52,13 @@ public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>,
 	protected void handleWindowEvent(WindowEvent<T> windowEvent, WindowBuffer<T> buffer)
 			throws Exception {
 		if (windowEvent.isElement()) {
+			System.out.println("element: " + windowEvent.getElement());
 			buffer.store(windowEvent.getElement());
 		} else if (windowEvent.isEviction()) {
+			System.out.println("eviction: " + windowEvent.getEviction());
 			buffer.evict(windowEvent.getEviction());
 		} else if (windowEvent.isTrigger()) {
+			System.out.println("trigger");
 			buffer.emitWindow(collector);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
new file mode 100644
index 0000000..1e3ba86
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
+
+public class WindowUtils {
+
+	public enum WindowTransformation {
+		REDUCEWINDOW, MAPWINDOW, NONE;
+		private Function UDF;
+
+		public WindowTransformation with(Function UDF) {
+			this.UDF = UDF;
+			return this;
+		}
+
+		public Function getUDF() {
+			return UDF;
+		}
+	}
+
+	public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+		return (eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy))
+				|| (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy);
+	}
+
+	public static boolean isTimeOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+		return trigger instanceof TimeTriggerPolicy && eviction instanceof TimeEvictionPolicy;
+	}
+
+	public static boolean isSystemTimeTrigger(TriggerPolicy<?> trigger) {
+		return trigger instanceof TimeTriggerPolicy
+				&& ((TimeTriggerPolicy<?>) trigger).timestampWrapper.isDefaultTimestamp();
+	}
+
+	public static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer getKey(StreamWindow<R> value) throws Exception {
+			return value.windowID;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 2ed0002..25b5f87 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -89,17 +89,6 @@ public class WindowIntegrationTest implements Serializable {
 		inputs.add(11);
 		inputs.add(11);
 
-		StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
-
-		DataStream<Integer> source = env.fromCollection(inputs);
-
-		source.window(Count.of(2)).every(Count.of(3)).sum(0).getDiscretizedStream()
-				.addSink(new CentralSink1());
-
-		source.window(Count.of(4)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
-				.flatten().addSink(new CentralSink2());
-
-		KeySelector<Integer, ?> key = new ModKey(2);
 		Timestamp<Integer> ts = new Timestamp<Integer>() {
 
 			private static final long serialVersionUID = 1L;
@@ -110,38 +99,50 @@ public class WindowIntegrationTest implements Serializable {
 			}
 		};
 
+		StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
+
+		DataStream<Integer> source = env.fromCollection(inputs);
+
+		source.window(Time.of(2, ts)).every(Time.of(3, ts)).sum(0).getDiscretizedStream()
+				.addSink(new CentralSink1());
+
+		source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
+				.flatten().addSink(new CentralSink2());
+
+		KeySelector<Integer, ?> key = new ModKey(2);
+
 		source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
 				.addSink(new DistributedSink1());
 
 		source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
 				.mapWindow(new IdentityWindowMap()).flatten().addSink(new DistributedSink2());
 
-		source.window(Count.of(2)).every(Count.of(3)).min(0).getDiscretizedStream()
+		source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
 				.addSink(new CentralSink3());
 
 		source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
 				.addSink(new DistributedSink3());
 
-		source.window(Count.of(5)).mapWindow(new IdentityWindowMap()).flatten()
+		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
 				.addSink(new DistributedSink4());
 
 		env.execute();
 
-		// sum ( Count of 2 slide 3 )
+		// sum ( Time of 2 slide 3 )
 		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
-		expected1.add(StreamWindow.fromElements(4));
+		expected1.add(StreamWindow.fromElements(5));
 		expected1.add(StreamWindow.fromElements(9));
-		expected1.add(StreamWindow.fromElements(22));
+		expected1.add(StreamWindow.fromElements(32));
 
 		validateOutput(expected1, CentralSink1.windows);
 
-		// Tumbling Count of 4 grouped by mod 2
+		// Tumbling Time of 4 grouped by mod 2
 		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
-		expected2.add(StreamWindow.fromElements(2, 2));
+		expected2.add(StreamWindow.fromElements(2, 2, 4));
 		expected2.add(StreamWindow.fromElements(1, 3));
-		expected2.add(StreamWindow.fromElements(4, 10));
-		expected2.add(StreamWindow.fromElements(5, 11));
-		expected2.add(StreamWindow.fromElements(11));
+		expected2.add(StreamWindow.fromElements(5));
+		expected2.add(StreamWindow.fromElements(10));
+		expected2.add(StreamWindow.fromElements(11, 11));
 
 		validateOutput(expected2, CentralSink2.windows);
 
@@ -167,11 +168,11 @@ public class WindowIntegrationTest implements Serializable {
 
 		validateOutput(expected4, DistributedSink2.windows);
 
-		// min ( Count of 2 slide 3 )
+		// min ( Time of 2 slide 3 )
 		List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
-		expected5.add(StreamWindow.fromElements(2));
+		expected5.add(StreamWindow.fromElements(1));
 		expected5.add(StreamWindow.fromElements(4));
-		expected5.add(StreamWindow.fromElements(11));
+		expected5.add(StreamWindow.fromElements(10));
 
 		validateOutput(expected5, CentralSink3.windows);
 
@@ -186,8 +187,9 @@ public class WindowIntegrationTest implements Serializable {
 		validateOutput(expected6, DistributedSink3.windows);
 
 		List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
-		expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4));
-		expected7.add(StreamWindow.fromElements(5, 10, 11, 11));
+		expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
+		expected7.add(StreamWindow.fromElements(10));
+		expected7.add(StreamWindow.fromElements(10, 11, 11));
 
 		validateOutput(expected7, DistributedSink4.windows);