You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/22 23:13:30 UTC

flink git commit: [streaming] FullStream window helper added + partitioner bugfix

Repository: flink
Updated Branches:
  refs/heads/master 579f991ea -> 56cb7937e


[streaming] FullStream window helper added + partitioner bugfix

Closes #614


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56cb7937
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56cb7937
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56cb7937

Branch: refs/heads/master
Commit: 56cb7937e0b03b16214a3b573754b6e6f082fa43
Parents: 579f991
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Apr 21 13:33:14 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Apr 22 23:12:24 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |   4 +
 .../streaming/api/datastream/DataStream.java    |  36 ++--
 .../api/datastream/WindowedDataStream.java      |  57 +++++--
 .../windowing/WindowPartExtractor.java          |   2 +-
 .../streaming/api/windowing/WindowUtils.java    |   8 +-
 .../api/windowing/helper/FullStream.java        |  57 +++++++
 .../windowing/policy/KeepAllEvictionPolicy.java |  29 ++++
 .../windowbuffer/TumblingGroupedPreReducer.java |  26 ++-
 .../windowbuffer/TumblingPreReducer.java        |  23 ++-
 .../partitioner/DistributePartitioner.java      |   6 +
 .../runtime/partitioner/StreamPartitioner.java  |   4 +
 .../windowing/WindowIntegrationTest.java        | 169 +++++++++++++++----
 .../TumblingGroupedPreReducerTest.java          |   9 +-
 .../windowbuffer/TumblingPreReducerTest.java    |   5 +-
 .../flink/streaming/api/scala/DataStream.scala  |   7 +
 15 files changed, 357 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 6dae978..e630e69 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -634,6 +634,7 @@ Several predefined policies are provided in the API, including delta-based, coun
  * `Time.of(…)`
  * `Count.of(…)`
  * `Delta.of(…)`
+ * `FullStream.window()`
 
 For detailed description of these policies please refer to the [Javadocs](http://flink.apache.org/docs/latest/api/java/).
 
@@ -774,6 +775,9 @@ The above call would create global windows of 1000 elements group it by the firs
 
 Notice that here we only defined the window size once at the beginning of the transformation. This means that anything that happens afterwards (`groupBy(firstKey).mapWindow(…).groupBy(secondKey).reduceWindow(…)`) happens inside the 1000 element windows. Of course the mapWindow might reduce the number of elements but the key idea is that each transformation still corresponds to the same 1000 elements in the original stream.
 
+#### Periodic aggregations on the full stream history
+Sometimes it is necessary to aggregate over all the previously seen data in the stream. For this purpose either use the `dataStream.window(FullStream.window()).every(trigger)` or equivalently `dataStream.every(trigger)`. 
+
 #### Global vs local discretisation
 By default all window discretisation calls (`dataStream.window(…)`) define global windows meaning that a global window of count 100 will contain the last 100 elements arrived at the discretisation operator in order. In most cases (except for Time) this means that the operator doing the actual discretisation needs to have a parallelism of 1 to be able to correctly execute the discretisation logic.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index f4d4965..7a6a0fa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -55,9 +55,9 @@ import org.apache.flink.streaming.api.datastream.temporal.StreamCrossOperator;
 import org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
 import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -73,6 +73,7 @@ import org.apache.flink.streaming.api.operators.StreamReduce;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
+import org.apache.flink.streaming.api.windowing.helper.FullStream;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
@@ -155,7 +156,7 @@ public class DataStream<OUT> {
 		this.id = dataStream.id;
 		this.parallelism = dataStream.parallelism;
 		this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
-		this.partitioner = dataStream.partitioner;
+		this.partitioner = dataStream.partitioner.copy();
 		this.streamGraph = dataStream.streamGraph;
 		this.typeInfo = dataStream.typeInfo;
 		this.mergedStreams = new ArrayList<DataStream<OUT>>();
@@ -573,8 +574,8 @@ public class DataStream<OUT> {
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
 				Utils.getCallLocationName(), false);
 
-		return transform("Fold", outType, new StreamFold<OUT, R>(clean(folder),
-				initialValue, outType));
+		return transform("Fold", outType, new StreamFold<OUT, R>(clean(folder), initialValue,
+				outType));
 	}
 
 	/**
@@ -910,11 +911,11 @@ public class DataStream<OUT> {
 	 * transformation like {@link WindowedDataStream#reduceWindow},
 	 * {@link WindowedDataStream#mapWindow} or aggregations on preset
 	 * chunks(windows) of the data stream. To define windows a
-	 * {@link WindowingHelper} such as {@link Time}, {@link Count} and
-	 * {@link Delta} can be used.</br></br> When applied to a grouped data
-	 * stream, the windows (evictions) and slide sizes (triggers) will be
-	 * computed on a per group basis. </br></br> For more advanced control over
-	 * the trigger and eviction policies please refer to
+	 * {@link WindowingHelper} such as {@link Time}, {@link Count},
+	 * {@link Delta} and {@link FullStream} can be used.</br></br> When applied
+	 * to a grouped data stream, the windows (evictions) and slide sizes
+	 * (triggers) will be computed on a per group basis. </br></br> For more
+	 * advanced control over the trigger and eviction policies please refer to
 	 * {@link #window(trigger, eviction)} </br> </br> For example to create a
 	 * sum every 5 seconds in a tumbling fashion:</br>
 	 * {@code ds.window(Time.of(5, TimeUnit.SECONDS)).sum(field)} </br></br> To
@@ -927,7 +928,8 @@ public class DataStream<OUT> {
 	 * 
 	 * @param policyHelper
 	 *            Any {@link WindowingHelper} such as {@link Time},
-	 *            {@link Count} and {@link Delta} to define the window size.
+	 *            {@link Count}, {@link Delta} {@link FullStream} to define the
+	 *            window size.
 	 * @return A {@link WindowedDataStream} providing further operations.
 	 */
 	@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -956,6 +958,17 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Create a {@link WindowedDataStream} on the full stream history, to
+	 * produce periodic aggregates.
+	 * 
+	 * @return A {@link WindowedDataStream} providing further operations.
+	 */
+	@SuppressWarnings("rawtypes")
+	public WindowedDataStream<OUT> every(WindowingHelper policyHelper) {
+		return window(FullStream.window()).every(policyHelper);
+	}
+
+	/**
 	 * Writes a DataStream to the standard output stream (stdout).<br>
 	 * For each element of the DataStream the result of
 	 * {@link Object#toString()} is written.
@@ -1266,8 +1279,7 @@ public class DataStream<OUT> {
 		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType(),
 				sinkOperator);
 
-		streamGraph.addOperator(returnStream.getId(), sinkOperator, getType(), null,
-				"Stream Sink");
+		streamGraph.addOperator(returnStream.getId(), sinkOperator, getType(), null, "Stream Sink");
 
 		this.connectGraph(this.copy(), returnStream.getId(), 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index c2ebdcf..da8611e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -34,9 +34,9 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.functions.RichWindowMapFunction;
 import org.apache.flink.streaming.api.functions.WindowMapFunction;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.windowing.GroupedActiveDiscretizer;
 import org.apache.flink.streaming.api.operators.windowing.GroupedStreamDiscretizer;
@@ -55,9 +55,14 @@ import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.KeepAllEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountGroupedPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimeGroupedPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimePreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.PreAggregator;
 import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
@@ -65,10 +70,6 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedP
 import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimePreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountGroupedPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimeGroupedPreReducer;
 import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 
@@ -240,6 +241,9 @@ public class WindowedDataStream<OUT> {
 	 * @return The discretised stream
 	 */
 	public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
+		if (getEviction() instanceof KeepAllEvictionPolicy) {
+			throw new RuntimeException("Cannot get discretized stream for full stream window");
+		}
 		return discretize(WindowTransformation.NONE, new BasicWindowBuffer<OUT>())
 				.getDiscretizedStream();
 	}
@@ -347,7 +351,7 @@ public class WindowedDataStream<OUT> {
 	 */
 	public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
 		return discretize(WindowTransformation.MAPWINDOW.with(clean(windowMapFunction)),
-				new BasicWindowBuffer<OUT>()).mapWindow(windowMapFunction);
+				getWindowBuffer(WindowTransformation.MAPWINDOW)).mapWindow(windowMapFunction);
 	}
 
 	/**
@@ -372,7 +376,8 @@ public class WindowedDataStream<OUT> {
 			TypeInformation<R> outType) {
 
 		return discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction),
-				new BasicWindowBuffer<OUT>()).mapWindow(windowMapFunction, outType);
+				getWindowBuffer(WindowTransformation.MAPWINDOW)).mapWindow(windowMapFunction,
+				outType);
 	}
 
 	private DiscretizedStream<OUT> discretize(WindowTransformation transformation,
@@ -393,9 +398,7 @@ public class WindowedDataStream<OUT> {
 				.setParallelism(parallelism)
 				.transform(windowBuffer.getClass().getSimpleName(),
 						new StreamWindowTypeInfo<OUT>(getType()), bufferOperator)
-				.setParallelism(parallelism), groupByKey, transformation,
-				WindowUtils.isParallelPolicy(getTrigger(), getEviction(),
-						dataStream.getParallelism()));
+				.setParallelism(parallelism), groupByKey, transformation, false);
 
 	}
 
@@ -497,14 +500,26 @@ public class WindowedDataStream<OUT> {
 
 		if (transformation == WindowTransformation.REDUCEWINDOW) {
 			if (WindowUtils.isTumblingPolicy(trigger, eviction)) {
-				if (groupByKey == null) {
-					return new TumblingPreReducer<OUT>(
-							(ReduceFunction<OUT>) transformation.getUDF(), getType()
-									.createSerializer(getExecutionConfig()));
+				if (eviction instanceof KeepAllEvictionPolicy) {
+					if (groupByKey == null) {
+						return new TumblingPreReducer<OUT>(
+								(ReduceFunction<OUT>) transformation.getUDF(), getType()
+										.createSerializer(getExecutionConfig())).noEvict();
+					} else {
+						return new TumblingGroupedPreReducer<OUT>(
+								(ReduceFunction<OUT>) transformation.getUDF(), groupByKey,
+								getType().createSerializer(getExecutionConfig())).noEvict();
+					}
 				} else {
-					return new TumblingGroupedPreReducer<OUT>(
-							(ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
-									.createSerializer(getExecutionConfig()));
+					if (groupByKey == null) {
+						return new TumblingPreReducer<OUT>(
+								(ReduceFunction<OUT>) transformation.getUDF(), getType()
+										.createSerializer(getExecutionConfig()));
+					} else {
+						return new TumblingGroupedPreReducer<OUT>(
+								(ReduceFunction<OUT>) transformation.getUDF(), groupByKey,
+								getType().createSerializer(getExecutionConfig()));
+					}
 				}
 			} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
 				if (groupByKey == null) {
@@ -564,7 +579,13 @@ public class WindowedDataStream<OUT> {
 				}
 			}
 		}
-		return new BasicWindowBuffer<OUT>();
+
+		if (eviction instanceof KeepAllEvictionPolicy) {
+			throw new RuntimeException(
+					"Full stream policy can only be used with operations that support preaggregations, such as reduce or aggregations");
+		} else {
+			return new BasicWindowBuffer<OUT>();
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartExtractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartExtractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartExtractor.java
index 577e7b7..50b4e7d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartExtractor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartExtractor.java
@@ -40,7 +40,7 @@ public class WindowPartExtractor<OUT> implements FlatMapFunction<StreamWindow<OU
 		// We dont emit new values for the same index, this avoids sending the
 		// same information for the same partitioned window multiple times
 		if (value.windowID != lastIndex) {
-
+			
 			// For empty windows we send 0 since these windows will be filtered
 			// out
 			if (value.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
index 944a478..dd62a44 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.KeepAllEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
@@ -118,7 +119,7 @@ public class WindowUtils {
 	}
 
 	public static boolean isTumblingPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
-		if (eviction instanceof TumblingEvictionPolicy) {
+		if (eviction instanceof TumblingEvictionPolicy || eviction instanceof KeepAllEvictionPolicy) {
 			return true;
 		} else if (isTimeOnly(trigger, eviction)) {
 			long slide = getSlideSize(trigger);
@@ -140,7 +141,8 @@ public class WindowUtils {
 	}
 
 	public static boolean isTimeOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
-		return trigger instanceof TimeTriggerPolicy && eviction instanceof TimeEvictionPolicy;
+		return trigger instanceof TimeTriggerPolicy
+				&& (eviction instanceof TimeEvictionPolicy || eviction instanceof KeepAllEvictionPolicy);
 	}
 
 	public static boolean isCountOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
@@ -170,7 +172,7 @@ public class WindowUtils {
 
 			return slide > window
 					&& ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
-					.getStart()
+							.getStart()
 					&& ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
 		} else {
 			return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
new file mode 100644
index 0000000..3508b26
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.KeepAllEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+/**
+ * Window that represents the full stream history. Can be used only as eviction
+ * policy and only with operations that support pre-aggregator such as reduce or
+ * aggregations.
+ */
+public class FullStream<DATA> implements WindowingHelper<DATA>, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private FullStream() {
+	}
+
+	@Override
+	public EvictionPolicy<DATA> toEvict() {
+		return new KeepAllEvictionPolicy<DATA>();
+	}
+
+	@Override
+	public TriggerPolicy<DATA> toTrigger() {
+		throw new RuntimeException(
+				"Full stream policy can be only used as eviction. Use .every(..) after the window call.");
+	}
+
+	/**
+	 * Returns a helper representing an eviction that keeps all previous record
+	 * history.
+	 */
+	public static <R> FullStream<R> window() {
+		return new FullStream<R>();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/KeepAllEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/KeepAllEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/KeepAllEvictionPolicy.java
new file mode 100644
index 0000000..6fad749
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/KeepAllEvictionPolicy.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+public class KeepAllEvictionPolicy<T> implements EvictionPolicy<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public int notifyEviction(T datapoint, boolean triggered, int bufferSize) {
+		return 0;
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
index 68f9837..d2f6234 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
@@ -37,16 +37,23 @@ public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements Pre
 	private KeySelector<T, ?> keySelector;
 
 	private Map<Object, T> reducedValues;
-	private Map<Object, T> keyInstancePerKey = new HashMap<Object, T>();
 
 	private TypeSerializer<T> serializer;
 
+	private boolean evict = true;
+
 	public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
 			TypeSerializer<T> serializer) {
+		this(reducer, keySelector, serializer, true);
+	}
+
+	public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
+			TypeSerializer<T> serializer, boolean evict) {
 		this.reducer = reducer;
 		this.serializer = serializer;
 		this.keySelector = keySelector;
 		this.reducedValues = new HashMap<Object, T>();
+		this.evict = evict;
 	}
 
 	public void emitWindow(Collector<StreamWindow<T>> collector) {
@@ -55,11 +62,12 @@ public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements Pre
 			StreamWindow<T> currentWindow = createEmptyWindow();
 			currentWindow.addAll(reducedValues.values());
 			collector.collect(currentWindow);
-			reducedValues.clear();
 		} else if (emitEmpty) {
 			collector.collect(createEmptyWindow());
 		}
-
+		if (evict) {
+			reducedValues.clear();
+		}
 	}
 
 	public void store(T element) throws Exception {
@@ -74,18 +82,15 @@ public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements Pre
 		}
 
 		reducedValues.put(key, reduced);
-
-		if (emitPerGroup && !keyInstancePerKey.containsKey(key)) {
-			keyInstancePerKey.put(key, element);
-		}
 	}
 
+	@Override
 	public void evict(int n) {
 	}
 
 	@Override
 	public TumblingGroupedPreReducer<T> clone() {
-		return new TumblingGroupedPreReducer<T>(reducer, keySelector, serializer);
+		return new TumblingGroupedPreReducer<T>(reducer, keySelector, serializer, evict);
 	}
 
 	@Override
@@ -93,4 +98,9 @@ public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements Pre
 		return reducedValues.toString();
 	}
 
+	public TumblingGroupedPreReducer<T> noEvict() {
+		this.evict = false;
+		return this;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
index d08d207..f396e41 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
@@ -23,7 +23,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.util.Collector;
 
 /**
- * Non-grouped pre-reducer for tumbling eviction policy (the slide size is the same as the window size).
+ * Non-grouped pre-reducer for tumbling eviction policy (the slide size is the
+ * same as the window size).
  */
 public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
 
@@ -34,9 +35,17 @@ public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggrega
 	private T reduced;
 	private TypeSerializer<T> serializer;
 
+	private boolean evict = true;
+
 	public TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
+		this(reducer, serializer, true);
+	}
+
+	private TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+			boolean evict) {
 		this.reducer = reducer;
 		this.serializer = serializer;
+		this.evict = evict;
 	}
 
 	public void emitWindow(Collector<StreamWindow<T>> collector) {
@@ -44,10 +53,13 @@ public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggrega
 			StreamWindow<T> currentWindow = createEmptyWindow();
 			currentWindow.add(reduced);
 			collector.collect(currentWindow);
-			reduced = null;
 		} else if (emitEmpty) {
 			collector.collect(createEmptyWindow());
 		}
+
+		if (evict) {
+			reduced = null;
+		}
 	}
 
 	public void store(T element) throws Exception {
@@ -63,7 +75,7 @@ public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggrega
 
 	@Override
 	public TumblingPreReducer<T> clone() {
-		return new TumblingPreReducer<T>(reducer, serializer);
+		return new TumblingPreReducer<T>(reducer, serializer, evict);
 	}
 
 	@Override
@@ -77,4 +89,9 @@ public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggrega
 		return this;
 	}
 
+	public TumblingPreReducer<T> noEvict() {
+		this.evict = false;
+		return this;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java
index 3110da9..0fa191b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java
@@ -31,9 +31,11 @@ public class DistributePartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
 	private int[] returnArray = new int[] {-1};
+	private boolean forward;
 
 	public DistributePartitioner(boolean forward) {
 		super(forward ? PartitioningStrategy.FORWARD : PartitioningStrategy.DISTRIBUTE);
+		this.forward = forward;
 	}
 
 	@Override
@@ -43,4 +45,8 @@ public class DistributePartitioner<T> extends StreamPartitioner<T> {
 
 		return this.returnArray;
 	}
+	
+	public StreamPartitioner<T> copy() {
+		return new DistributePartitioner<T>(forward);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
index 2699c3f..cd5b9c2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -41,4 +41,8 @@ public abstract class StreamPartitioner<T> implements
 	public PartitioningStrategy getStrategy() {
 		return strategy;
 	}
+
+	public StreamPartitioner<T> copy() {
+		return this;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
index 54c4b17..0593c55 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.operators.windowing;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -25,6 +26,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,6 +36,7 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.helper.FullStream;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
@@ -77,6 +80,7 @@ public class WindowIntegrationTest implements Serializable {
 
 	}
 
+	@SuppressWarnings("serial")
 	@Test
 	public void test() throws Exception {
 
@@ -108,28 +112,49 @@ public class WindowIntegrationTest implements Serializable {
 		DataStream<Integer> source = env.fromCollection(inputs);
 
 		source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
-				.addSink(new CentralSink1());
+				.addSink(new TestSink1());
 
 		source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
-				.flatten().addSink(new CentralSink2());
+				.flatten().addSink(new TestSink2());
 
 		source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
-				.addSink(new DistributedSink1());
+				.addSink(new TestSink4());
 
 		source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
-				.mapWindow(new IdentityWindowMap()).flatten().addSink(new DistributedSink2());
+				.mapWindow(new IdentityWindowMap()).flatten().addSink(new TestSink5());
 
 		source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
-				.addSink(new CentralSink3());
+				.addSink(new TestSink3());
 
 		source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
-				.addSink(new DistributedSink3());
+				.addSink(new TestSink6());
 
 		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
-				.addSink(new DistributedSink4());
+				.addSink(new TestSink7());
 
 		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
-				.getDiscretizedStream().addSink(new DistributedSink5());
+				.getDiscretizedStream().addSink(new TestSink8());
+
+		try {
+			source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream();
+			fail();
+		} catch (Exception e) {
+		}
+		try {
+			source.window(FullStream.window()).getDiscretizedStream();
+			fail();
+		} catch (Exception e) {
+		}
+		try {
+			source.every(Count.of(5)).mapWindow(new IdentityWindowMap()).getDiscretizedStream();
+			fail();
+		} catch (Exception e) {
+		}
+
+		source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11());
+
+		source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0)
+				.getDiscretizedStream().addSink(new TestSink12());
 
 		DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
 			private static final long serialVersionUID = 1L;
@@ -164,11 +189,17 @@ public class WindowIntegrationTest implements Serializable {
 			}
 		});
 
-		source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
-				.addSink(new DistributedSink6());
+		source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9());
 
 		source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream()
-				.addSink(new DistributedSink7());
+				.addSink(new TestSink10());
+
+		source.map(new MapFunction<Integer, Integer>() {
+			@Override
+			public Integer map(Integer value) throws Exception {
+				return value;
+			}
+		}).every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink13());
 
 		env.execute();
 
@@ -180,7 +211,7 @@ public class WindowIntegrationTest implements Serializable {
 		expected1.add(StreamWindow.fromElements(10));
 		expected1.add(StreamWindow.fromElements(32));
 
-		validateOutput(expected1, CentralSink1.windows);
+		validateOutput(expected1, TestSink1.windows);
 
 		// Tumbling Time of 4 grouped by mod 2
 		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
@@ -190,7 +221,7 @@ public class WindowIntegrationTest implements Serializable {
 		expected2.add(StreamWindow.fromElements(10));
 		expected2.add(StreamWindow.fromElements(11, 11));
 
-		validateOutput(expected2, CentralSink2.windows);
+		validateOutput(expected2, TestSink2.windows);
 
 		// groupby mod 2 sum ( Tumbling Time of 4)
 		List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
@@ -200,7 +231,7 @@ public class WindowIntegrationTest implements Serializable {
 		expected3.add(StreamWindow.fromElements(8));
 		expected3.add(StreamWindow.fromElements(10));
 
-		validateOutput(expected3, DistributedSink1.windows);
+		validateOutput(expected3, TestSink4.windows);
 
 		// groupby mod3 Tumbling Count of 2 grouped by mod 2
 		List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
@@ -212,7 +243,7 @@ public class WindowIntegrationTest implements Serializable {
 		expected4.add(StreamWindow.fromElements(11));
 		expected4.add(StreamWindow.fromElements(3));
 
-		validateOutput(expected4, DistributedSink2.windows);
+		validateOutput(expected4, TestSink5.windows);
 
 		// min ( Time of 2 slide 3 )
 		List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
@@ -220,7 +251,7 @@ public class WindowIntegrationTest implements Serializable {
 		expected5.add(StreamWindow.fromElements(4));
 		expected5.add(StreamWindow.fromElements(10));
 
-		validateOutput(expected5, CentralSink3.windows);
+		validateOutput(expected5, TestSink3.windows);
 
 		// groupby mod 2 max ( Tumbling Time of 4)
 		List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
@@ -230,25 +261,25 @@ public class WindowIntegrationTest implements Serializable {
 		expected6.add(StreamWindow.fromElements(4));
 		expected6.add(StreamWindow.fromElements(10));
 
-		validateOutput(expected6, DistributedSink3.windows);
+		validateOutput(expected6, TestSink6.windows);
 
 		List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
 		expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
 		expected7.add(StreamWindow.fromElements(10));
 		expected7.add(StreamWindow.fromElements(10, 11, 11));
 
-		validateOutput(expected7, DistributedSink4.windows);
+		validateOutput(expected7, TestSink7.windows);
 
 		List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
 		expected8.add(StreamWindow.fromElements(4, 8));
 		expected8.add(StreamWindow.fromElements(4, 5));
 		expected8.add(StreamWindow.fromElements(10, 22));
 
-		for (List<Integer> sw : DistributedSink5.windows) {
+		for (List<Integer> sw : TestSink8.windows) {
 			Collections.sort(sw);
 		}
 
-		validateOutput(expected8, DistributedSink5.windows);
+		validateOutput(expected8, TestSink8.windows);
 
 		List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>();
 		expected9.add(StreamWindow.fromElements(6));
@@ -257,17 +288,50 @@ public class WindowIntegrationTest implements Serializable {
 		expected9.add(StreamWindow.fromElements(30));
 		expected9.add(StreamWindow.fromElements(38));
 
-		validateOutput(expected9, DistributedSink6.windows);
+		validateOutput(expected9, TestSink9.windows);
 
 		List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>();
 		expected10.add(StreamWindow.fromElements(6, 9));
 		expected10.add(StreamWindow.fromElements(16, 24));
 
-		for (List<Integer> sw : DistributedSink7.windows) {
+		for (List<Integer> sw : TestSink10.windows) {
+			Collections.sort(sw);
+		}
+
+		validateOutput(expected10, TestSink10.windows);
+
+		List<StreamWindow<Integer>> expected11 = new ArrayList<StreamWindow<Integer>>();
+		expected11.add(StreamWindow.fromElements(8));
+		expected11.add(StreamWindow.fromElements(38));
+		expected11.add(StreamWindow.fromElements(49));
+
+		for (List<Integer> sw : TestSink11.windows) {
 			Collections.sort(sw);
 		}
 
-		validateOutput(expected10, DistributedSink7.windows);
+		validateOutput(expected11, TestSink11.windows);
+
+		List<StreamWindow<Integer>> expected12 = new ArrayList<StreamWindow<Integer>>();
+		expected12.add(StreamWindow.fromElements(4, 4));
+		expected12.add(StreamWindow.fromElements(18, 20));
+		expected12.add(StreamWindow.fromElements(18, 31));
+
+		for (List<Integer> sw : TestSink12.windows) {
+			Collections.sort(sw);
+		}
+
+		validateOutput(expected12, TestSink12.windows);
+
+		List<StreamWindow<Integer>> expected13 = new ArrayList<StreamWindow<Integer>>();
+		expected13.add(StreamWindow.fromElements(17));
+		expected13.add(StreamWindow.fromElements(27));
+		expected13.add(StreamWindow.fromElements(49));
+
+		for (List<Integer> sw : TestSink13.windows) {
+			Collections.sort(sw);
+		}
+
+		validateOutput(expected13, TestSink13.windows);
 
 	}
 
@@ -276,7 +340,46 @@ public class WindowIntegrationTest implements Serializable {
 	}
 
 	@SuppressWarnings("serial")
-	private static class CentralSink1 implements SinkFunction<StreamWindow<Integer>> {
+	private static class TestSink1 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink2 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink3 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink4 implements SinkFunction<StreamWindow<Integer>> {
 
 		public static List<StreamWindow<Integer>> windows = Collections
 				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@@ -289,7 +392,7 @@ public class WindowIntegrationTest implements Serializable {
 	}
 
 	@SuppressWarnings("serial")
-	private static class CentralSink2 implements SinkFunction<StreamWindow<Integer>> {
+	private static class TestSink5 implements SinkFunction<StreamWindow<Integer>> {
 
 		public static List<StreamWindow<Integer>> windows = Collections
 				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@@ -302,7 +405,7 @@ public class WindowIntegrationTest implements Serializable {
 	}
 
 	@SuppressWarnings("serial")
-	private static class CentralSink3 implements SinkFunction<StreamWindow<Integer>> {
+	private static class TestSink6 implements SinkFunction<StreamWindow<Integer>> {
 
 		public static List<StreamWindow<Integer>> windows = Collections
 				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@@ -315,7 +418,7 @@ public class WindowIntegrationTest implements Serializable {
 	}
 
 	@SuppressWarnings("serial")
-	private static class DistributedSink1 implements SinkFunction<StreamWindow<Integer>> {
+	private static class TestSink7 implements SinkFunction<StreamWindow<Integer>> {
 
 		public static List<StreamWindow<Integer>> windows = Collections
 				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@@ -328,7 +431,7 @@ public class WindowIntegrationTest implements Serializable {
 	}
 
 	@SuppressWarnings("serial")
-	private static class DistributedSink2 implements SinkFunction<StreamWindow<Integer>> {
+	private static class TestSink8 implements SinkFunction<StreamWindow<Integer>> {
 
 		public static List<StreamWindow<Integer>> windows = Collections
 				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@@ -341,7 +444,7 @@ public class WindowIntegrationTest implements Serializable {
 	}
 
 	@SuppressWarnings("serial")
-	private static class DistributedSink3 implements SinkFunction<StreamWindow<Integer>> {
+	private static class TestSink9 implements SinkFunction<StreamWindow<Integer>> {
 
 		public static List<StreamWindow<Integer>> windows = Collections
 				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@@ -354,7 +457,7 @@ public class WindowIntegrationTest implements Serializable {
 	}
 
 	@SuppressWarnings("serial")
-	private static class DistributedSink4 implements SinkFunction<StreamWindow<Integer>> {
+	private static class TestSink10 implements SinkFunction<StreamWindow<Integer>> {
 
 		public static List<StreamWindow<Integer>> windows = Collections
 				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@@ -367,7 +470,7 @@ public class WindowIntegrationTest implements Serializable {
 	}
 
 	@SuppressWarnings("serial")
-	private static class DistributedSink5 implements SinkFunction<StreamWindow<Integer>> {
+	private static class TestSink11 implements SinkFunction<StreamWindow<Integer>> {
 
 		public static List<StreamWindow<Integer>> windows = Collections
 				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@@ -380,7 +483,7 @@ public class WindowIntegrationTest implements Serializable {
 	}
 
 	@SuppressWarnings("serial")
-	private static class DistributedSink6 implements SinkFunction<StreamWindow<Integer>> {
+	private static class TestSink12 implements SinkFunction<StreamWindow<Integer>> {
 
 		public static List<StreamWindow<Integer>> windows = Collections
 				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@@ -393,7 +496,7 @@ public class WindowIntegrationTest implements Serializable {
 	}
 
 	@SuppressWarnings("serial")
-	private static class DistributedSink7 implements SinkFunction<StreamWindow<Integer>> {
+	private static class TestSink13 implements SinkFunction<StreamWindow<Integer>> {
 
 		public static List<StreamWindow<Integer>> windows = Collections
 				.synchronizedList(new ArrayList<StreamWindow<Integer>>());

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
index ee2379f..c5107bf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
@@ -66,6 +66,7 @@ public class TumblingGroupedPreReducerTest {
 		wb.store(serializer.copy(inputs.get(0)));
 		wb.store(serializer.copy(inputs.get(1)));
 		wb.emitWindow(collector);
+		wb.evict(2);
 
 		assertEquals(1, collected.size());
 
@@ -76,12 +77,10 @@ public class TumblingGroupedPreReducerTest {
 		wb.store(serializer.copy(inputs.get(1)));
 		wb.store(serializer.copy(inputs.get(2)));
 
-		// Nothing should happen here
-		wb.evict(3);
-
 		wb.store(serializer.copy(inputs.get(3)));
 
 		wb.emitWindow(collector);
+		wb.evict(4);
 
 		assertEquals(2, collected.size());
 
@@ -114,13 +113,15 @@ public class TumblingGroupedPreReducerTest {
 		wb.store(serializer.copy(inputs.get(0)));
 		wb.store(serializer.copy(inputs.get(1)));
 		wb.emitWindow(collector);
-
+		wb.evict(2);
+		
 		assertSetEquals(StreamWindow.fromElements(inputs.get(0), inputs.get(1)), collected.get(0));
 		
 		wb.store(serializer.copy(inputs.get(0)));
 		wb.store(serializer.copy(inputs.get(1)));
 		wb.store(serializer.copy(inputs.get(2)));
 		wb.emitWindow(collector);
+		wb.evict(3);
 		
 		assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(2, 0), inputs.get(1)), collected.get(1));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
index ddaf747..b8de02e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
@@ -59,6 +59,7 @@ public class TumblingPreReducerTest {
 		wb.store(serializer.copy(inputs.get(1)));
 
 		wb.emitWindow(collector);
+		wb.evict(2);
 
 		assertEquals(1, collected.size());
 		assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, 1)),
@@ -68,12 +69,10 @@ public class TumblingPreReducerTest {
 		wb.store(serializer.copy(inputs.get(1)));
 		wb.store(serializer.copy(inputs.get(2)));
 
-		// Nothing should happen here
-		wb.evict(3);
-
 		wb.store(serializer.copy(inputs.get(3)));
 
 		wb.emitWindow(collector);
+		wb.evict(4);
 
 		assertEquals(2, collected.size());
 		assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(10, -2)),

http://git-wip-us.apache.org/repos/asf/flink/blob/56cb7937/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 4ccb073..41be9d4 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -530,6 +530,13 @@ class DataStream[T](javaStream: JavaStream[T]) {
    */
   def window(trigger: TriggerPolicy[T], eviction: EvictionPolicy[T]):
     WindowedDataStream[T] = javaStream.window(trigger, eviction)
+    
+  /**
+   * Create a WindowedDataStream based on the full stream history to perform periodic
+   * aggregations.
+   */  
+  def every(windowingHelper: WindowingHelper[_]): WindowedDataStream[T] = 
+    javaStream.every(windowingHelper)
 
   /**
    *