You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/04/05 17:19:16 UTC

[2/4] flink git commit: [FLINK-3614] Remove Non-Keyed Window Operator

[FLINK-3614] Remove Non-Keyed Window Operator

Instead we use a dummy KeySelector and the regular WindowOperator now.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/505512db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/505512db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/505512db

Branch: refs/heads/master
Commit: 505512dbe461b9840dde6197c71dbb90b49c0495
Parents: b1e5086
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 16 15:59:05 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Apr 5 16:26:54 2016 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       | 264 ++++----
 .../EvictingNonKeyedWindowOperator.java         |  88 ---
 .../windowing/NonKeyedWindowOperator.java       | 624 -------------------
 .../operators/windowing/WindowOperator.java     |   5 -
 .../windowing/buffers/EvictingWindowBuffer.java |  38 --
 .../windowing/buffers/FoldingWindowBuffer.java  | 163 -----
 .../windowing/buffers/ListWindowBuffer.java     | 127 ----
 .../windowing/buffers/ReducingWindowBuffer.java | 121 ----
 .../windowing/buffers/WindowBuffer.java         |  73 ---
 .../windowing/buffers/WindowBufferFactory.java  |  47 --
 .../InternalIterableAllWindowFunction.java      |  75 +++
 .../InternalIterableWindowFunction.java         |   5 +-
 .../InternalSingleValueAllWindowFunction.java   |  77 +++
 .../InternalSingleValueWindowFunction.java      |   5 +-
 .../windowing/AllWindowTranslationTest.java     |  54 +-
 .../EvictingNonKeyedWindowOperatorTest.java     | 147 -----
 .../windowing/NonKeyedWindowOperatorTest.java   | 406 ------------
 .../api/scala/AllWindowTranslationTest.scala    |  27 +-
 .../StreamingScalaAPICompletenessTest.scala     |   2 -
 19 files changed, 345 insertions(+), 2003 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 54c0b86..1a59bf1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -21,11 +21,14 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
@@ -35,17 +38,17 @@ import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.FoldingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 
 /**
  * A {@code AllWindowedStream} represents a data stream where the stream of
@@ -71,8 +74,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWi
 @Public
 public class AllWindowedStream<T, W extends Window> {
 
-	/** The data stream that is windowed by this stream */
-	private final DataStream<T> input;
+	/** The keyed data stream that is windowed by this stream */
+	private final KeyedStream<T, Byte> input;
 
 	/** The window assigner */
 	private final WindowAssigner<? super T, W> windowAssigner;
@@ -87,7 +90,7 @@ public class AllWindowedStream<T, W extends Window> {
 	@PublicEvolving
 	public AllWindowedStream(DataStream<T> input,
 			WindowAssigner<? super T, W> windowAssigner) {
-		this.input = input;
+		this.input = input.keyBy(new NullByteKeySelector<T>());
 		this.windowAssigner = windowAssigner;
 		this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
 	}
@@ -106,7 +109,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 *
 	 * <p>
 	 * Note: When using an evictor window performance will degrade significantly, since
-	 * pre-aggregation of window results cannot be used.
+	 * incremental aggregation of window results cannot be used.
 	 */
 	@PublicEvolving
 	public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
@@ -123,55 +126,32 @@ public class AllWindowedStream<T, W extends Window> {
 	 * Applies a reduce function to the window. The window function is called for each evaluation
 	 * of the window for each key individually. The output of the reduce function is interpreted
 	 * as a regular non-windowed stream.
+	 *
 	 * <p>
-	 * This window will try and pre-aggregate data as much as the window policies permit. For example,
-	 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
-	 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
+	 * This window will try and incrementally aggregate data as much as the window policies permit.
+	 * For example, tumbling time windows can aggregate the data, meaning that only one element per
+	 * key is stored. Sliding time windows will aggregate on the granularity of the slide interval,
 	 * so a few elements are stored per key (one per slide interval).
-	 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
-	 * aggregation tree.
-	 * 
+	 * Custom windows may not be able to incrementally aggregate, or may need to store extra values
+	 * in an aggregation tree.
+	 *
 	 * @param function The reduce function.
-	 * @return The data stream that is the result of applying the reduce function to the window. 
+	 * @return The data stream that is the result of applying the reduce function to the window.
 	 */
+	@SuppressWarnings("unchecked")
 	public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
 		if (function instanceof RichFunction) {
 			throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " +
-				"Please use apply(ReduceFunction, WindowFunction) instead.");
+					"Please use apply(ReduceFunction, WindowFunction) instead.");
 		}
 
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
 
 		String callLocation = Utils.getCallLocationName();
-		String udfName = "AllWindowedStream." + callLocation;
-
-		SingleOutputStreamOperator<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
-		if (result != null) {
-			return result;
-		}
-
-		String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
-		OneInputStreamOperator<T, T> operator;
-
-		if (evictor != null) {
-			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
-					new ReduceIterableAllWindowFunction<W, T>(function),
-					trigger,
-					evictor);
-
-		} else {
-			operator = new NonKeyedWindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new ReducingWindowBuffer.Factory<>(function, getInputType().createSerializer(getExecutionEnvironment().getConfig())),
-					new ReduceIterableAllWindowFunction<W, T>(function),
-					trigger);
-		}
+		String udfName = "WindowedStream." + callLocation;
 
-		return input.transform(opName, input.getType(), operator).setParallelism(1);
+		return apply(function, new PassThroughAllWindowFunction<W, T>());
 	}
 
 	/**
@@ -185,11 +165,11 @@ public class AllWindowedStream<T, W extends Window> {
 	public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
 		if (function instanceof RichFunction) {
 			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
-				"Please use apply(FoldFunction, WindowFunction) instead.");
+					"Please use apply(FoldFunction, WindowFunction) instead.");
 		}
 
 		TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
-			Utils.getCallLocationName(), true);
+				Utils.getCallLocationName(), true);
 
 		return fold(initialValue, function, resultType);
 	}
@@ -205,25 +185,25 @@ public class AllWindowedStream<T, W extends Window> {
 	public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
 		if (function instanceof RichFunction) {
 			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
-				"Please use apply(FoldFunction, WindowFunction) instead.");
+					"Please use apply(FoldFunction, WindowFunction) instead.");
 		}
 
 		return apply(initialValue, function, new PassThroughAllWindowFunction<W, R>(), resultType);
 	}
 
 	/**
-	 * Applies a window function to the window. The window function is called for each evaluation
-	 * of the window for each key individually. The output of the window function is interpreted
-	 * as a regular non-windowed stream.
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
 	 * <p>
 	 * Not that this function requires that all data in the windows is buffered until the window
-	 * is evaluated, as the function provides no means of pre-aggregation.
-	 * 
+	 * is evaluated, as the function provides no means of incremental aggregation.
+	 *
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
 	public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
-		@SuppressWarnings("unchecked, rawtypes")
 		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
 				function, AllWindowFunction.class, true, true, getInputType(), null, false);
 
@@ -231,46 +211,58 @@ public class AllWindowedStream<T, W extends Window> {
 	}
 
 	/**
-	 * Applies the given window function to each window. The window function is called for each evaluation
-	 * of the window for each key individually. The output of the window function is interpreted
-	 * as a regular non-windowed stream.
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
 	 * <p>
 	 * Not that this function requires that all data in the windows is buffered until the window
-	 * is evaluated, as the function provides no means of pre-aggregation.
+	 * is evaluated, as the function provides no means of incremental aggregation.
 	 *
 	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
 	public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
 
 		String callLocation = Utils.getCallLocationName();
-		String udfName = "AllWindowedStream." + callLocation;
+		String udfName = "WindowedStream." + callLocation;
 
-		SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
-		if (result != null) {
-			return result;
-		}
+		String opName;
+		KeySelector<T, Byte> keySel = input.getKeySelector();
 
+		WindowOperator<Byte, T, Iterable<T>, R, W> operator;
 
-		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+		if (evictor != null) {
+			ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
+					new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
 
-		NonKeyedWindowOperator<T, T, R, W> operator;
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 
-		if (evictor != null) {
-			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+			operator = new EvictingWindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
-					function,
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalIterableAllWindowFunction<>(function),
 					trigger,
 					evictor);
 
 		} else {
-			operator = new NonKeyedWindowOperator<>(windowAssigner,
+			ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
+					input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+			operator = new WindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
-					function,
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalIterableAllWindowFunction<>(function),
 					trigger);
 		}
 
@@ -283,19 +275,19 @@ public class AllWindowedStream<T, W extends Window> {
 	 * interpreted as a regular non-windowed stream.
 	 *
 	 * <p>
-	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+	 * Arriving data is incrementally aggregated using the given reducer.
 	 *
-	 * @param preAggregator The reduce function that is used for pre-aggregation
+	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
 
-	public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function) {
+	public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function) {
 		TypeInformation<T> inType = input.getType();
 		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
 				function, AllWindowFunction.class, true, true, inType, null, false);
 
-		return apply(preAggregator, function, resultType);
+		return apply(reduceFunction, function, resultType);
 	}
 
 	/**
@@ -304,43 +296,59 @@ public class AllWindowedStream<T, W extends Window> {
 	 * interpreted as a regular non-windowed stream.
 	 *
 	 * <p>
-	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+	 * Arriving data is incrementally aggregated using the given reducer.
 	 *
-	 * @param preAggregator The reduce function that is used for pre-aggregation
+	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
 	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
-		if (preAggregator instanceof RichFunction) {
-			throw new UnsupportedOperationException("Pre-aggregator of apply can not be a RichFunction.");
+	public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+		if (reduceFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
 		}
 
 		//clean the closures
 		function = input.getExecutionEnvironment().clean(function);
-		preAggregator = input.getExecutionEnvironment().clean(preAggregator);
+		reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
 
 		String callLocation = Utils.getCallLocationName();
-		String udfName = "AllWindowedStream." + callLocation;
+		String udfName = "WindowedStream." + callLocation;
 
-		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+		String opName;
+		KeySelector<T, Byte> keySel = input.getKeySelector();
 
 		OneInputStreamOperator<T, R> operator;
 
 		if (evictor != null) {
-			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+			ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
+					new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+			operator = new EvictingWindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
-					new ReduceApplyAllWindowFunction<>(preAggregator, function),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalIterableAllWindowFunction<>(new ReduceApplyAllWindowFunction<>(reduceFunction, function)),
 					trigger,
 					evictor);
 
 		} else {
-			operator = new NonKeyedWindowOperator<>(windowAssigner,
-				windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-				new ReducingWindowBuffer.Factory<>(preAggregator, getInputType().createSerializer(getExecutionEnvironment().getConfig())),
-				function,
-				trigger);
+			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
+					reduceFunction,
+					input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+			operator = new WindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalSingleValueAllWindowFunction<>(function),
+					trigger);
 		}
 
 		return input.transform(opName, resultType, operator).setParallelism(1);
@@ -360,8 +368,9 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
 	public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) {
+
 		TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
-			Utils.getCallLocationName(), true);
+				Utils.getCallLocationName(), true);
 
 		return apply(initialValue, foldFunction, function, resultType);
 	}
@@ -390,37 +399,51 @@ public class AllWindowedStream<T, W extends Window> {
 		foldFunction = input.getExecutionEnvironment().clean(foldFunction);
 
 		String callLocation = Utils.getCallLocationName();
-		String udfName = "AllWindowedStream." + callLocation;
+		String udfName = "WindowedStream." + callLocation;
 
 		String opName;
+		KeySelector<T, Byte> keySel = input.getKeySelector();
 
 		OneInputStreamOperator<T, R> operator;
 
 		if (evictor != null) {
-			opName = "NonParallelTriggerWindow(" + windowAssigner  + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 
-			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
-				windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-				new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
-				new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function),
-				trigger,
-				evictor);
+			ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
+					new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+			operator = new EvictingWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function)),
+					trigger,
+					evictor);
 
 		} else {
-			opName = "NonParallelTriggerWindow(" + windowAssigner  + ", " + trigger + ", " + udfName + ")";
+			FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
+					initialValue,
+					foldFunction,
+					resultType);
 
-			operator = new NonKeyedWindowOperator<>(windowAssigner,
-				windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-				new FoldingWindowBuffer.Factory<>(foldFunction, initialValue, resultType.createSerializer(getExecutionEnvironment().getConfig())),
-				function,
-				trigger);
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+			operator = new WindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalSingleValueAllWindowFunction<>(function),
+					trigger);
 		}
 
 		return input.transform(opName, resultType, operator).setParallelism(1);
 	}
 
 	// ------------------------------------------------------------------------
-	//  Aggregations on the  windows
+	//  Aggregations on the keyed windows
 	// ------------------------------------------------------------------------
 
 	/**
@@ -621,16 +644,6 @@ public class AllWindowedStream<T, W extends Window> {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-
-	private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
-			Function function,
-			TypeInformation<R> resultType,
-			String functionName) {
-
-		// TODO: add once non-parallel fast aligned time windows operator is ready
-		return null;
-	}
-
 	public StreamExecutionEnvironment getExecutionEnvironment() {
 		return input.getExecutionEnvironment();
 	}
@@ -638,4 +651,17 @@ public class AllWindowedStream<T, W extends Window> {
 	public TypeInformation<T> getInputType() {
 		return input.getType();
 	}
+
+	/**
+	 * Used as dummy KeySelector to allow using WindowOperator for Non-Keyed Windows.
+	 * @param <T>
+	 */
+	private static class NullByteKeySelector<T> implements KeySelector<T, Byte> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Byte getKey(T value) throws Exception {
+			return 0;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
deleted file mode 100644
index 22d207d..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Evicting window operator for non-keyed windows.
- *
- * @see org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator
- *
- * @param <IN> The type of the incoming elements.
- * @param <ACC> The type of elements stored in the window buffers.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
- */
-@Internal
-public class EvictingNonKeyedWindowOperator<IN, ACC, OUT, W extends Window> extends NonKeyedWindowOperator<IN, ACC, OUT, W> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final Evictor<? super IN, ? super W> evictor;
-
-	public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
-			TypeSerializer<W> windowSerializer,
-			WindowBufferFactory<? super IN, ACC, ? extends EvictingWindowBuffer<IN, ACC>> windowBufferFactory,
-			AllWindowFunction<ACC, OUT, W> windowFunction,
-			Trigger<? super IN, ? super W> trigger,
-			Evictor<? super IN, ? super W> evictor) {
-		super(windowAssigner, windowSerializer, windowBufferFactory, windowFunction, trigger);
-		this.evictor = requireNonNull(evictor);
-	}
-
-	@Override
-	@SuppressWarnings("unchecked, rawtypes")
-	protected void emitWindow(Context context) throws Exception {
-		timestampedCollector.setAbsoluteTimestamp(context.window.maxTimestamp());
-		EvictingWindowBuffer<IN, ACC> windowBuffer = (EvictingWindowBuffer<IN, ACC>) context.windowBuffer;
-
-		int toEvict = 0;
-		if (windowBuffer.size() > 0) {
-			// need some type trickery here...
-			toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), context.window);
-		}
-
-		windowBuffer.removeElements(toEvict);
-
-		userFunction.apply(
-				context.window,
-				context.windowBuffer.getUnpackedElements(),
-				timestampedCollector);
-	}
-
-	// ------------------------------------------------------------------------
-	// Getters for testing
-	// ------------------------------------------------------------------------
-
-	@VisibleForTesting
-	public Evictor<? super IN, ? super W> getEvictor() {
-		return evictor;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
deleted file mode 100644
index 6bd5c7d..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ /dev/null
@@ -1,624 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext;
-import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.util.InstantiationUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Window operator for non-keyed windows.
- *
- * @see org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
- *
- * @param <IN> The type of the incoming elements.
- * @param <ACC> The type of elements stored in the window buffers.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
- */
-@Internal
-public class NonKeyedWindowOperator<IN, ACC, OUT, W extends Window>
-		extends AbstractUdfStreamOperator<OUT, AllWindowFunction<ACC, OUT, W>>
-		implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
-
-	// ------------------------------------------------------------------------
-	// Configuration values and stuff from the user
-	// ------------------------------------------------------------------------
-
-	private final WindowAssigner<? super IN, W> windowAssigner;
-
-	private final Trigger<? super IN, ? super W> trigger;
-
-	private final WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory;
-
-	/**
-	 * This is used to copy the incoming element because it can be put into several window
-	 * buffers.
-	 */
-	private TypeSerializer<IN> inputSerializer;
-
-	/**
-	 * For serializing the window in checkpoints.
-	 */
-	private final TypeSerializer<W> windowSerializer;
-
-	// ------------------------------------------------------------------------
-	// State that is not checkpointed
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Processing time timers that are currently in-flight.
-	 */
-	private transient Map<Long, Set<Context>> processingTimeTimers;
-
-	/**
-	 * Current waiting watermark callbacks.
-	 */
-	private transient Map<Long, Set<Context>> watermarkTimers;
-
-	/**
-	 * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
-	 */
-	protected transient TimestampedCollector<OUT> timestampedCollector;
-
-	/**
-	 * To keep track of the current watermark so that we can immediately fire if a trigger
-	 * registers an event time callback for a timestamp that lies in the past.
-	 */
-	protected transient long currentWatermark = -1L;
-
-	// ------------------------------------------------------------------------
-	// State that needs to be checkpointed
-	// ------------------------------------------------------------------------
-
-	/**
-	 * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
-	 * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
-	 */
-	protected transient Map<W, Context> windows;
-
-	/**
-	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
-	 */
-	public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
-			TypeSerializer<W> windowSerializer,
-			WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory,
-			AllWindowFunction<ACC, OUT, W> windowFunction,
-			Trigger<? super IN, ? super W> trigger) {
-
-		super(windowFunction);
-
-		this.windowAssigner = requireNonNull(windowAssigner);
-		this.windowSerializer = windowSerializer;
-
-		this.windowBufferFactory = requireNonNull(windowBufferFactory);
-		this.trigger = requireNonNull(trigger);
-
-		setChainingStrategy(ChainingStrategy.ALWAYS);
-	}
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		currentWatermark = -1;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
-		inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
-	}
-
-	@Override
-	public final void open() throws Exception {
-		super.open();
-		timestampedCollector = new TimestampedCollector<>(output);
-
-		if (inputSerializer == null) {
-			throw new IllegalStateException("Input serializer was not set.");
-		}
-
-		// these could already be initialized from restoreState()
-		if (watermarkTimers == null) {
-			watermarkTimers = new HashMap<>();
-		}
-		if (processingTimeTimers == null) {
-			processingTimeTimers = new HashMap<>();
-		}
-		if (windows == null) {
-			windows = new HashMap<>();
-		}
-
-		// re-register timers that this window context had set
-		for (Context context: windows.values()) {
-			if (context.processingTimeTimer > 0) {
-				Set<Context> triggers = processingTimeTimers.get(context.processingTimeTimer);
-				if (triggers == null) {
-					getRuntimeContext().registerTimer(context.processingTimeTimer, NonKeyedWindowOperator.this);
-					triggers = new HashSet<>();
-					processingTimeTimers.put(context.processingTimeTimer, triggers);
-				}
-				triggers.add(context);
-			}
-			if (context.watermarkTimer > 0) {
-				Set<Context> triggers = watermarkTimers.get(context.watermarkTimer);
-				if (triggers == null) {
-					triggers = new HashSet<>();
-					watermarkTimers.put(context.watermarkTimer, triggers);
-				}
-				triggers.add(context);
-			}
-
-		}
-	}
-
-	@Override
-	public final void dispose() {
-		super.dispose();
-		windows.clear();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public final void processElement(StreamRecord<IN> element) throws Exception {
-		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
-
-		for (W window: elementWindows) {
-			Context context = windows.get(window);
-			if (context == null) {
-				WindowBuffer<IN, ACC> windowBuffer = windowBufferFactory.create();
-				context = new Context(window, windowBuffer);
-				windows.put(window, context);
-			}
-			context.windowBuffer.storeElement(element);
-			TriggerResult triggerResult = context.onElement(element);
-			processTriggerResult(triggerResult, window);
-		}
-	}
-
-	protected void emitWindow(Context context) throws Exception {
-		timestampedCollector.setAbsoluteTimestamp(context.window.maxTimestamp());
-
-		if (context.windowBuffer.size() > 0) {
-			userFunction.apply(
-					context.window,
-					context.windowBuffer.getUnpackedElements(),
-					timestampedCollector);
-		}
-	}
-
-	private void processTriggerResult(TriggerResult triggerResult, W window) throws Exception {
-		if (!triggerResult.isFire() && !triggerResult.isPurge()) {
-			// do nothing
-			return;
-		}
-		Context context;
-
-		if (triggerResult.isPurge()) {
-			context = windows.remove(window);
-		} else {
-			context = windows.get(window);
-		}
-		if (context == null) {
-			LOG.debug("Window {} already gone.", window);
-			return;
-		}
-
-		if (triggerResult.isFire()) {
-			emitWindow(context);
-		}
-
-		if (triggerResult.isPurge()) {
-			context.clear();
-		}
-	}
-
-	@Override
-	public final void processWatermark(Watermark mark) throws Exception {
-		List<Set<Context>> toTrigger = new ArrayList<>();
-
-		Iterator<Map.Entry<Long, Set<Context>>> it = watermarkTimers.entrySet().iterator();
-
-		while (it.hasNext()) {
-			Map.Entry<Long, Set<Context>> triggers = it.next();
-			if (triggers.getKey() <= mark.getTimestamp()) {
-				toTrigger.add(triggers.getValue());
-				it.remove();
-			}
-		}
-
-		for (Set<Context> ctxs: toTrigger) {
-			for (Context ctx: ctxs) {
-				// double check the time. it can happen that the trigger registers a new timer,
-				// in that case the entry is left in the watermarkTimers set for performance reasons.
-				// We have to check here whether the entry in the set still reflects the
-				// currently set timer in the Context.
-				if (ctx.watermarkTimer <= mark.getTimestamp()) {
-					TriggerResult triggerResult = ctx.onEventTime(ctx.watermarkTimer);
-					processTriggerResult(triggerResult, ctx.window);
-				}
-			}
-		}
-
-		output.emitWatermark(mark);
-
-		this.currentWatermark = mark.getTimestamp();
-	}
-
-	@Override
-	public final void trigger(long time) throws Exception {
-		List<Set<Context>> toTrigger = new ArrayList<>();
-
-		Iterator<Map.Entry<Long, Set<Context>>> it = processingTimeTimers.entrySet().iterator();
-
-		while (it.hasNext()) {
-			Map.Entry<Long, Set<Context>> triggers = it.next();
-			if (triggers.getKey() <= time) {
-				toTrigger.add(triggers.getValue());
-				it.remove();
-			}
-		}
-
-		for (Set<Context> ctxs: toTrigger) {
-			for (Context ctx: ctxs) {
-				// double check the time. it can happen that the trigger registers a new timer,
-				// in that case the entry is left in the processingTimeTimers set for
-				// performance reasons. We have to check here whether the entry in the set still
-				// reflects the currently set timer in the Context.
-				if (ctx.processingTimeTimer <= time) {
-					TriggerResult triggerResult = ctx.onProcessingTime(ctx.processingTimeTimer);
-					processTriggerResult(triggerResult, ctx.window);
-				}
-			}
-		}
-	}
-
-	/**
-	 * The {@code Context} is responsible for keeping track of the state of one pane.
-	 *
-	 * <p>
-	 * A pane is the bucket of elements that have the same key (assigned by the
-	 * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
-	 * be in multiple panes of it was assigned to multiple windows by the
-	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
-	 * have their own instance of the {@code Trigger}.
-	 */
-	protected class Context implements TriggerContext {
-		protected W window;
-
-		protected WindowBuffer<IN, ACC> windowBuffer;
-
-		protected HashMap<String, Serializable> state;
-
-		// use these to only allow one timer in flight at a time of each type
-		// if the trigger registers another timer this value here will be overwritten,
-		// the timer is not removed from the set of in-flight timers to improve performance.
-		// When a trigger fires it is just checked against the last timer that was set.
-		protected long watermarkTimer;
-		protected long processingTimeTimer;
-
-		public Context(
-				W window,
-				WindowBuffer<IN, ACC> windowBuffer) {
-			this.window = window;
-			this.windowBuffer = windowBuffer;
-			state = new HashMap<>();
-
-			this.watermarkTimer = -1;
-			this.processingTimeTimer = -1;
-		}
-
-		@Override
-		public long getCurrentWatermark() {
-			return currentWatermark;
-		}
-
-		@SuppressWarnings("unchecked")
-		protected Context(DataInputView in, ClassLoader userClassloader) throws Exception {
-			this.window = windowSerializer.deserialize(in);
-			this.watermarkTimer = in.readLong();
-			this.processingTimeTimer = in.readLong();
-
-			int stateSize = in.readInt();
-			byte[] stateData = new byte[stateSize];
-			in.read(stateData);
-			state = InstantiationUtil.deserializeObject(stateData, userClassloader);
-
-			this.windowBuffer = windowBufferFactory.restoreFromSnapshot(in);
-		}
-
-		protected void writeToState(AbstractStateBackend.CheckpointStateOutputView out) throws IOException {
-			windowSerializer.serialize(window, out);
-			out.writeLong(watermarkTimer);
-			out.writeLong(processingTimeTimer);
-
-			byte[] serializedState = InstantiationUtil.serializeObject(state);
-			out.writeInt(serializedState.length);
-			out.write(serializedState, 0, serializedState.length);
-
-			windowBuffer.snapshot(out);
-		}
-
-		@Override
-		public <S extends Serializable> ValueState<S> getKeyValueState(String name,
-			Class<S> stateType,
-			S defaultState) {
-			requireNonNull(stateType, "The state type class must not be null");
-
-			TypeInformation<S> typeInfo;
-			try {
-				typeInfo = TypeExtractor.getForClass(stateType);
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Cannot analyze type '" + stateType.getName() +
-					"' from the class alone, due to generic type parameters. " +
-					"Please specify the TypeInformation directly.", e);
-			}
-
-			return getKeyValueState(name, typeInfo, defaultState);
-		}
-
-		@Override
-		public <S extends Serializable> ValueState<S> getKeyValueState(String name,
-			TypeInformation<S> stateType,
-			S defaultState) {
-
-			requireNonNull(name, "The name of the state must not be null");
-			requireNonNull(stateType, "The state type information must not be null");
-
-			ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(
-					name, stateType.createSerializer(getExecutionConfig()), defaultState);
-			return getPartitionedState(stateDesc);
-		}
-
-		@Override
-		@SuppressWarnings("rawtypes, unchecked")
-		public <S extends State> S getPartitionedState(final StateDescriptor<S, ?> stateDescriptor) {
-			if (!(stateDescriptor instanceof ValueStateDescriptor)) {
-				throw new UnsupportedOperationException("NonKeyedWindowOperator Triggers only " +
-					"support ValueState.");
-			}
-			@SuppressWarnings("unchecked")
-			final ValueStateDescriptor<?> valueStateDescriptor = (ValueStateDescriptor<?>) stateDescriptor;
-			ValueState valueState = new ValueState() {
-				@Override
-				public Object value() throws IOException {
-					Object value = state.get(stateDescriptor.getName());
-					if (value == null) {
-						value = valueStateDescriptor.getDefaultValue();
-						state.put(stateDescriptor.getName(), (Serializable) value);
-					}
-					return value;
-				}
-
-				@Override
-				public void update(Object value) throws IOException {
-					if (!(value instanceof Serializable)) {
-						throw new UnsupportedOperationException(
-							"Value state of NonKeyedWindowOperator must be serializable.");
-					}
-					state.put(stateDescriptor.getName(), (Serializable) value);
-				}
-
-				@Override
-				public void clear() {
-					state.remove(stateDescriptor.getName());
-				}
-			};
-			return (S) valueState;
-		}
-
-		@Override
-		public void registerProcessingTimeTimer(long time) {
-			if (this.processingTimeTimer == time) {
-				// we already have set a trigger for that time
-				return;
-			}
-			Set<Context> triggers = processingTimeTimers.get(time);
-			if (triggers == null) {
-				getRuntimeContext().registerTimer(time, NonKeyedWindowOperator.this);
-				triggers = new HashSet<>();
-				processingTimeTimers.put(time, triggers);
-			}
-			this.processingTimeTimer = time;
-			triggers.add(this);
-		}
-
-		@Override
-		public void registerEventTimeTimer(long time) {
-			if (watermarkTimer == time) {
-				// we already have set a trigger for that time
-				return;
-			}
-			Set<Context> triggers = watermarkTimers.get(time);
-			if (triggers == null) {
-				triggers = new HashSet<>();
-				watermarkTimers.put(time, triggers);
-			}
-			this.watermarkTimer = time;
-			triggers.add(this);
-		}
-
-		@Override
-		public void deleteProcessingTimeTimer(long time) {
-			Set<Context> triggers = processingTimeTimers.get(time);
-			if (triggers != null) {
-				triggers.remove(this);
-			}
-		}
-
-		@Override
-		public void deleteEventTimeTimer(long time) {
-			Set<Context> triggers = watermarkTimers.get(time);
-			if (triggers != null) {
-				triggers.remove(this);
-			}
-
-		}
-
-		public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
-			TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
-			if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
-				// fire now and don't wait for the next watermark update
-				TriggerResult onEventTimeResult = onEventTime(watermarkTimer);
-				return TriggerResult.merge(onElementResult, onEventTimeResult);
-			} else {
-				return onElementResult;
-			}
-		}
-
-		public TriggerResult onProcessingTime(long time) throws Exception {
-			if (time == processingTimeTimer) {
-				processingTimeTimer = -1;
-				return trigger.onProcessingTime(time, window, this);
-			} else {
-				return TriggerResult.CONTINUE;
-			}
-		}
-
-		public TriggerResult onEventTime(long time) throws Exception {
-			if (time == watermarkTimer) {
-				watermarkTimer = -1;
-				TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this);
-
-				if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
-					// fire now and don't wait for the next watermark update
-					TriggerResult secondTriggerResult = onEventTime(watermarkTimer);
-					return TriggerResult.merge(firstTriggerResult, secondTriggerResult);
-				} else {
-					return firstTriggerResult;
-				}
-
-			} else {
-				return TriggerResult.CONTINUE;
-			}
-		}
-
-		public void clear() throws Exception {
-			trigger.clear(window, this);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Checkpointing
-	// ------------------------------------------------------------------------
-
-	@Override
-	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
-		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-
-		// we write the panes with the key/value maps into the stream
-		AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-
-		int numWindows = windows.size();
-		out.writeInt(numWindows);
-		for (Context context: windows.values()) {
-			context.writeToState(out);
-		}
-
-		taskState.setOperatorState(out.closeAndGetHandle());
-		return taskState;
-	}
-
-	@Override
-	public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
-		super.restoreState(taskState, recoveryTimestamp);
-
-		final ClassLoader userClassloader = getUserCodeClassloader();
-		@SuppressWarnings("unchecked")
-		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
-		DataInputView in = inputState.getState(userClassloader);
-
-		int numWindows = in.readInt();
-		this.windows = new HashMap<>(numWindows);
-		this.processingTimeTimers = new HashMap<>();
-		this.watermarkTimers = new HashMap<>();
-
-		for (int j = 0; j < numWindows; j++) {
-			Context context = new Context(in, userClassloader);
-			windows.put(context.window, context);
-		}
-	}
-
-
-	// ------------------------------------------------------------------------
-	// Getters for testing
-	// ------------------------------------------------------------------------
-
-	@VisibleForTesting
-	public Trigger<? super IN, ? super W> getTrigger() {
-		return trigger;
-	}
-
-	@VisibleForTesting
-	public WindowAssigner<? super IN, W> getWindowAssigner() {
-		return windowAssigner;
-	}
-
-	@VisibleForTesting
-	public WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> getWindowBufferFactory() {
-		return windowBufferFactory;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 9b7b347..ecad9b2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
@@ -77,10 +76,6 @@ import static java.util.Objects.requireNonNull;
  * the given {@link InternalWindowFunction} is invoked to produce the results that are emitted for
  * the pane to which the {@code Trigger} belongs.
  *
- * <p>
- * This operator also needs a {@link WindowBufferFactory} to create a buffer for storing the
- * elements of each pane.
- *
  * @param <K> The type of key returned by the {@code KeySelector}.
  * @param <IN> The type of the incoming elements.
  * @param <OUT> The type of elements emitted by the {@code InternalWindowFunction}.

http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
deleted file mode 100644
index 75f646d..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * A {@code WindowBuffer} that can also evict elements from the buffer. The order in which
- * the elements are added is preserved. Elements can only be evicted started from the beginning of
- * the buffer.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- * @param <O> The type of elements that this window buffer will return when asked for its contents.
- */
-@Internal
-public interface EvictingWindowBuffer<T, O> extends WindowBuffer<T, O> {
-
-	/**
-	 * Removes the given number of elements, starting from the beginning.
-	 * @param count The number of elements to remove.
-	 */
-	void removeElements(int count);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
deleted file mode 100644
index f6c2319..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Collections;
-
-/**
- * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a
- * {@link FoldFunction} to incrementally aggregate elements that are added to the buffer.
- *
- * @param <T> The type of elements that can be added to this {@code WindowBuffer}.
- * @param <ACC> The type of the accumulator that this {@code WindowBuffer} can store.
- */
-@Internal
-public class FoldingWindowBuffer<T, ACC> implements WindowBuffer<T, ACC> {
-
-	private final FoldFunction<T, ACC> foldFunction;
-	private final TypeSerializer<ACC> accSerializer;
-	private StreamRecord<ACC> data;
-
-	protected FoldingWindowBuffer(FoldFunction<T, ACC> foldFunction, ACC initialAccumulator, TypeSerializer<ACC> accSerializer) {
-		this.foldFunction = foldFunction;
-		this.accSerializer = accSerializer;
-		this.data = new StreamRecord<>(initialAccumulator);
-	}
-
-	protected FoldingWindowBuffer(FoldFunction<T, ACC> foldFunction, StreamRecord<ACC> initialAccumulator, TypeSerializer<ACC> accSerializer) {
-		this.foldFunction = foldFunction;
-		this.accSerializer = accSerializer;
-		this.data = initialAccumulator;
-	}
-
-	@Override
-	public void storeElement(StreamRecord<T> element) throws Exception {
-		data.replace(foldFunction.fold(data.getValue(), element.getValue()));
-	}
-
-	@Override
-	public Iterable<StreamRecord<ACC>> getElements() {
-		return Collections.singleton(data);
-	}
-
-	@Override
-	public Iterable<ACC> getUnpackedElements() {
-		return Collections.singleton(data.getValue());
-	}
-
-	@Override
-	public int size() {
-		return 1;
-	}
-
-	@Override
-	public void snapshot(DataOutputView out) throws IOException {
-		MultiplexingStreamRecordSerializer<ACC> recordSerializer = new MultiplexingStreamRecordSerializer<>(accSerializer);
-		recordSerializer.serialize(data, out);
-	}
-
-	public static class Factory<T, ACC> implements WindowBufferFactory<T, ACC, FoldingWindowBuffer<T, ACC>> {
-		private static final long serialVersionUID = 1L;
-
-		private final FoldFunction<T, ACC> foldFunction;
-
-		private final TypeSerializer<ACC> accSerializer;
-
-		private transient ACC initialAccumulator;
-
-		public Factory(FoldFunction<T, ACC> foldFunction, ACC initialValue, TypeSerializer<ACC> accSerializer) {
-			this.foldFunction = foldFunction;
-			this.accSerializer = accSerializer;
-			this.initialAccumulator = initialValue;
-		}
-
-		@Override
-		public FoldingWindowBuffer<T, ACC> create() {
-			return new FoldingWindowBuffer<>(foldFunction, accSerializer.copy(initialAccumulator), accSerializer);
-		}
-
-		@Override
-		public FoldingWindowBuffer<T, ACC> restoreFromSnapshot(DataInputView in) throws IOException {
-			MultiplexingStreamRecordSerializer<ACC> recordSerializer = new MultiplexingStreamRecordSerializer<>(accSerializer);
-			StreamElement element = recordSerializer.deserialize(in);
-			return new FoldingWindowBuffer<>(foldFunction, element.<ACC>asRecord(), accSerializer);
-		}
-
-		private void writeObject(final ObjectOutputStream out) throws IOException {
-			// write all the non-transient fields
-			out.defaultWriteObject();
-
-
-			byte[] serializedDefaultValue;
-			try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
-					DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos))
-			{
-				accSerializer.serialize(initialAccumulator, outView);
-
-				outView.flush();
-				serializedDefaultValue = baos.toByteArray();
-			}
-			catch (Exception e) {
-				throw new IOException("Unable to serialize initial accumulator of type " +
-						initialAccumulator.getClass().getSimpleName() + ".", e);
-			}
-
-			out.writeInt(serializedDefaultValue.length);
-			out.write(serializedDefaultValue);
-		}
-
-		private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
-			// read the non-transient fields
-			in.defaultReadObject();
-
-			// read the default value field
-			int size = in.readInt();
-			byte[] buffer = new byte[size];
-			int bytesRead = in.read(buffer);
-
-			if (bytesRead != size) {
-				throw new RuntimeException("Read size does not match expected size.");
-			}
-
-			try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
-					DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais))
-			{
-				initialAccumulator = accSerializer.deserialize(inView);
-			}
-			catch (Exception e) {
-				throw new IOException("Unable to deserialize initial accumulator.", e);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java
deleted file mode 100644
index 5b9dd3c..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-
-/**
- * An {@link EvictingWindowBuffer} that stores elements on the Java Heap.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-@Internal
-public class ListWindowBuffer<T> implements EvictingWindowBuffer<T, T> {
-
-	private final TypeSerializer<T>  serializer;
-
-	private ArrayDeque<StreamRecord<T>> elements;
-
-	protected ListWindowBuffer(TypeSerializer<T> serializer) {
-		this.serializer = serializer;
-		this.elements = new ArrayDeque<>();
-	}
-
-	protected ListWindowBuffer(ArrayDeque<StreamRecord<T>> elements, TypeSerializer<T> serializer) {
-		this.serializer = serializer;
-		this.elements = elements;
-	}
-
-	@Override
-	public void storeElement(StreamRecord<T> element) {
-		elements.add(element);
-	}
-
-	@Override
-	public void removeElements(int count) {
-		// TODO determine if this can be done in a better way
-		for (int i = 0; i < count; i++) {
-			elements.removeFirst();
-		}
-	}
-
-	@Override
-	public Iterable<StreamRecord<T>> getElements() {
-		return elements;
-	}
-
-	@Override
-	public Iterable<T> getUnpackedElements() {
-		return FluentIterable.from(elements).transform(new Function<StreamRecord<T>, T>() {
-			@Override
-			public T apply(StreamRecord<T> record) {
-				return record.getValue();
-			}
-		});
-	}
-
-	@Override
-	public int size() {
-		return elements.size();
-	}
-
-	@Override
-	public void snapshot(DataOutputView out) throws IOException {
-		out.writeInt(elements.size());
-
-		MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer);
-
-		for (StreamRecord<T> e: elements) {
-			recordSerializer.serialize(e, out);
-		}
-	}
-
-	public static class Factory<T> implements WindowBufferFactory<T, T, ListWindowBuffer<T>> {
-		private static final long serialVersionUID = 1L;
-
-		private final TypeSerializer<T> serializer;
-
-		public Factory(TypeSerializer<T> serializer) {
-			this.serializer = serializer;
-		}
-
-		@Override
-		public ListWindowBuffer<T> create() {
-			return new ListWindowBuffer<>(serializer);
-		}
-
-		@Override
-		public ListWindowBuffer<T> restoreFromSnapshot(DataInputView in) throws IOException {
-			int size = in.readInt();
-
-			MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer);
-
-			ArrayDeque<StreamRecord<T>> elements = new ArrayDeque<>();
-
-			for (int i = 0; i < size; i++) {
-				elements.add(recordSerializer.deserialize(in).<T>asRecord());
-			}
-
-			return new ListWindowBuffer<>(elements, serializer);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
deleted file mode 100644
index d3bf4b4..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-import java.util.Collections;
-
-/**
- * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a
- * {@link ReduceFunction} to incrementally aggregate elements that are added to the buffer.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-@Internal
-public class ReducingWindowBuffer<T> implements WindowBuffer<T, T> {
-
-	private final ReduceFunction<T> reduceFunction;
-	private final TypeSerializer<T> serializer;
-	private  StreamRecord<T> data;
-
-	protected ReducingWindowBuffer(ReduceFunction<T> reduceFunction, TypeSerializer<T> serializer) {
-		this.reduceFunction = reduceFunction;
-		this.serializer = serializer;
-		this.data = null;
-	}
-
-	protected ReducingWindowBuffer(ReduceFunction<T> reduceFunction, StreamRecord<T> data, TypeSerializer<T> serializer) {
-		this.reduceFunction = reduceFunction;
-		this.serializer = serializer;
-		this.data = data;
-	}
-
-	@Override
-	public void storeElement(StreamRecord<T> element) throws Exception {
-		if (data == null) {
-			data = element.copy(element.getValue());
-		} else {
-			data.replace(reduceFunction.reduce(data.getValue(), element.getValue()));
-		}
-	}
-
-	@Override
-	public Iterable<StreamRecord<T>> getElements() {
-		return Collections.singleton(data);
-	}
-
-	@Override
-	public Iterable<T> getUnpackedElements() {
-		return Collections.singleton(data.getValue());
-	}
-
-	@Override
-	public int size() {
-		return 1;
-	}
-
-	@Override
-	public void snapshot(DataOutputView out) throws IOException {
-		if (data != null) {
-			out.writeBoolean(true);
-			MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer);
-			recordSerializer.serialize(data, out);
-		} else {
-			out.writeBoolean(false);
-		}
-	}
-
-	public static class Factory<T> implements WindowBufferFactory<T, T, ReducingWindowBuffer<T>> {
-		private static final long serialVersionUID = 1L;
-
-		private final ReduceFunction<T> reduceFunction;
-
-		private final TypeSerializer<T> serializer;
-
-		public Factory(ReduceFunction<T> reduceFunction, TypeSerializer<T> serializer) {
-			this.reduceFunction = reduceFunction;
-			this.serializer = serializer;
-		}
-
-		@Override
-		public ReducingWindowBuffer<T> create() {
-			return new ReducingWindowBuffer<>(reduceFunction, serializer);
-		}
-
-		@Override
-		public ReducingWindowBuffer<T> restoreFromSnapshot(DataInputView in) throws IOException {
-			boolean hasValue = in.readBoolean();
-			if (hasValue) {
-				MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer);
-				StreamElement element = recordSerializer.deserialize(in);
-				return new ReducingWindowBuffer<>(reduceFunction, element.<T>asRecord(), serializer);
-			} else {
-				return new ReducingWindowBuffer<>(reduceFunction, serializer);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
deleted file mode 100644
index 16be0f3..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-
-/**
- * A {@code WindowBuffer} is used by
- * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} to store
- * the elements of one pane.
- *
- * <p>
- * A pane is the bucket of elements that have the same key (assigned by the
- * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
- * be in multiple panes of it was assigned to multiple windows by the
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
- * have their own instance of the {@code Evictor}.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- * @param <O> The type of elements that this window buffer will return when asked for its contents.
- */
-@Internal
-public interface WindowBuffer<T, O> {
-
-	/**
-	 * Adds the element to the buffer.
-	 *
-	 * @param element The element to add.
-	 */
-	void storeElement(StreamRecord<T> element) throws Exception;
-
-	/**
-	 * Returns all elements that are currently in the buffer.
-	 */
-	Iterable<StreamRecord<O>> getElements();
-
-	/**
-	 * Returns all elements that are currently in the buffer. This will unwrap the contained
-	 * elements from their {@link StreamRecord}.
-	 */
-	Iterable<O> getUnpackedElements();
-
-	/**
-	 * Returns the number of elements that are currently in the buffer.
-	 */
-	int size();
-
-	/**
-	 * Writes the contents of the window buffer to a {@link DataOutputView} for checkpointing.
-	 */
-	void snapshot(DataOutputView out) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
deleted file mode 100644
index 1ca6350..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * A factory for {@link WindowBuffer WindowBuffers}.
- *
- * @param <T> The type of elements that the created {@code WindowBuffer} can store.
- * @param <O> The type of elements that the created buffer will return when asked for its contents.
- * @param <B> The type of the created {@code WindowBuffer}
- */
-@Internal
-public interface WindowBufferFactory<T, O, B extends WindowBuffer<T, O>> extends Serializable {
-
-	/**
-	 * Creates a new {@code WindowBuffer}.
-	 */
-	B create();
-
-	/**
-	 * Restores a {@code WindowBuffer} from a previous snapshot written using
-	 * {@link WindowBuffer#snapshot(DataOutputView)}.
-	 */
-	B restoreFromSnapshot(DataInputView in) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
new file mode 100644
index 0000000..3a4be91
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Internal window function for wrapping an {@link AllWindowFunction} that takes an {@code Iterable}
+ * when the window state also is an {@code Iterable}.
+ */
+public final class InternalIterableAllWindowFunction<IN, OUT, W extends Window>
+		extends InternalWindowFunction<Iterable<IN>, OUT, Byte, W>
+		implements RichFunction {
+
+	private static final long serialVersionUID = 1L;
+
+	protected final AllWindowFunction<IN, OUT, W> wrappedFunction;
+
+	public InternalIterableAllWindowFunction(AllWindowFunction<IN, OUT, W> wrappedFunction) {
+		this.wrappedFunction = wrappedFunction;
+	}
+
+	@Override
+	public void apply(Byte key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+		wrappedFunction.apply(window, input, out);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		FunctionUtils.openFunction(this.wrappedFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(this.wrappedFunction);
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
index 7b441fb..822a57c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
@@ -30,7 +30,10 @@ import org.apache.flink.util.Collector;
  * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable}
  * when the window state also is an {@code Iterable}.
  */
-public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window> extends InternalWindowFunction<Iterable<IN>, OUT, KEY, W> implements RichFunction {
+public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window>
+		extends InternalWindowFunction<Iterable<IN>, OUT, KEY, W>
+		implements RichFunction {
+
 	private static final long serialVersionUID = 1L;
 
 	protected final WindowFunction<IN, OUT, KEY, W> wrappedFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
new file mode 100644
index 0000000..aa6e196
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping an {@link AllWindowFunction} that takes an {@code Iterable}
+ * when the window state is a single value.
+ */
+public final class InternalSingleValueAllWindowFunction<IN, OUT, W extends Window>
+		extends InternalWindowFunction<IN, OUT, Byte, W>
+		implements RichFunction {
+
+	private static final long serialVersionUID = 1L;
+
+	protected AllWindowFunction<IN, OUT, W> wrappedFunction;
+
+	public InternalSingleValueAllWindowFunction(AllWindowFunction<IN, OUT, W> wrappedFunction) {
+		this.wrappedFunction = wrappedFunction;
+	}
+
+	@Override
+	public void apply(Byte key, W window, IN input, Collector<OUT> out) throws Exception {
+		wrappedFunction.apply(window, Collections.singletonList(input), out);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		FunctionUtils.openFunction(this.wrappedFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(this.wrappedFunction);
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+
+	}
+}