You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/28 13:13:43 UTC

[2/2] flink git commit: [FLINK-5157] [streaming] Introduce ProcessAllWindowFunction

[FLINK-5157] [streaming] Introduce ProcessAllWindowFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/788b8392
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/788b8392
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/788b8392

Branch: refs/heads/master
Commit: 788b839213811c6f2407ac6d54fef28dfa3d29a6
Parents: 87b9077
Author: Ventura Del Monte <ve...@gmail.com>
Authored: Wed Feb 22 14:55:17 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Feb 28 14:02:56 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       | 398 ++++++++++++++++-
 .../FoldApplyProcessAllWindowFunction.java      | 120 +++++
 .../windowing/ProcessAllWindowFunction.java     |  59 +++
 .../ReduceApplyProcessAllWindowFunction.java    |  80 ++++
 .../windowing/RichProcessAllWindowFunction.java |  84 ++++
 ...ternalAggregateProcessAllWindowFunction.java |  83 ++++
 ...nternalIterableProcessAllWindowFunction.java |  63 +++
 ...rnalSingleValueProcessAllWindowFunction.java |  65 +++
 ...nternalSingleValueProcessWindowFunction.java |   3 +-
 .../FoldApplyProcessWindowFunctionTest.java     |  99 +++++
 .../operators/StateDescriptorPassingTest.java   |  19 +
 .../functions/InternalWindowFunctionTest.java   | 193 +++++++-
 .../windowing/AllWindowTranslationTest.java     | 445 +++++++++++++++++++
 .../streaming/api/scala/AllWindowedStream.scala | 186 +++++++-
 .../function/ProcessAllWindowFunction.scala     |  59 +++
 .../function/RichProcessAllWindowFunction.scala |  86 ++++
 .../ScalaProcessWindowFunctionWrapper.scala     |  85 +++-
 .../api/scala/AllWindowTranslationTest.scala    | 410 ++++++++++++++++-
 .../streaming/api/scala/WindowFoldITCase.scala  |  60 ++-
 .../api/scala/WindowFunctionITCase.scala        |  51 ++-
 .../api/scala/WindowReduceITCase.scala          |  59 ++-
 ...ngIdentityRichProcessAllWindowFunction.scala |  81 ++++
 .../streaming/runtime/WindowFoldITCase.java     |  73 +++
 23 files changed, 2830 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 742a2ed..a45cb0a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -40,9 +40,12 @@ import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.functions.windowing.AggregateApplyAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessAllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -52,8 +55,12 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -287,6 +294,102 @@ public class AllWindowedStream<T, W extends Window> {
 		return input.transform(opName, resultType, operator).forceNonParallel();
 	}
 
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is incrementally aggregated using the given reducer.
+	 *
+	 * @param reduceFunction The reduce function that is used for incremental aggregation.
+	 * @param function The process window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> reduce(
+			ReduceFunction<T> reduceFunction,
+			ProcessAllWindowFunction<T, R, W> function) {
+
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+			function, ProcessAllWindowFunction.class, true, true, input.getType(), null, false);
+
+		return reduce(reduceFunction, function, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is incrementally aggregated using the given reducer.
+	 *
+	 * @param reduceFunction The reduce function that is used for incremental aggregation.
+	 * @param function The process window function.
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessAllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+		if (reduceFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
+		}
+
+		//clean the closures
+		function = input.getExecutionEnvironment().clean(function);
+		reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "AllWindowedStream." + callLocation;
+
+		String opName;
+		KeySelector<T, Byte> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			@SuppressWarnings({"unchecked", "rawtypes"})
+			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+				(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			ListStateDescriptor<StreamRecord<T>> stateDesc =
+				new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+			operator =
+				new EvictingWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalIterableProcessAllWindowFunction<>(new ReduceApplyProcessAllWindowFunction<>(reduceFunction, function)),
+					trigger,
+					evictor,
+					allowedLateness);
+
+		} else {
+			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
+				reduceFunction,
+				input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+			operator =
+				new WindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalSingleValueProcessAllWindowFunction<>(function),
+					trigger,
+					allowedLateness);
+		}
+
+		return input.transform(opName, resultType, operator).forceNonParallel();
+	}
+
 	// ------------------------------------------------------------------------
 	//  AggregateFunction
 	// ------------------------------------------------------------------------
@@ -483,6 +586,137 @@ public class AllWindowedStream<T, W extends Window> {
 		return input.transform(opName, resultType, operator).forceNonParallel();
 	}
 
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+	 * that the window function typically has only a single value to process when called.
+	 *
+	 * @param aggFunction The aggregate function that is used for incremental aggregation.
+	 * @param windowFunction The process window function.
+	 *
+	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @param <ACC> The type of the AggregateFunction's accumulator
+	 * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
+	 * @param <R> The type of the elements in the resulting stream, equal to the
+	 *            WindowFunction's result type
+	 */
+	@PublicEvolving
+	public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
+			AggregateFunction<T, ACC, V> aggFunction,
+			ProcessAllWindowFunction<V, R, W> windowFunction) {
+
+		checkNotNull(aggFunction, "aggFunction");
+		checkNotNull(windowFunction, "windowFunction");
+
+		TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
+				aggFunction, input.getType(), null, false);
+
+		TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
+				aggFunction, input.getType(), null, false);
+
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				windowFunction, ProcessAllWindowFunction.class, true, true, aggResultType, null, false);
+
+		return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+	 * that the window function typically has only a single value to process when called.
+	 *
+	 * @param aggregateFunction The aggregation function that is used for incremental aggregation.
+	 * @param windowFunction The process window function.
+	 * @param accumulatorType Type information for the internal accumulator type of the aggregation function
+	 * @param resultType Type information for the result type of the window function
+	 *
+	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @param <ACC> The type of the AggregateFunction's accumulator
+	 * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
+	 * @param <R> The type of the elements in the resulting stream, equal to the
+	 *            WindowFunction's result type
+	 */
+	@PublicEvolving
+	public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
+			AggregateFunction<T, ACC, V> aggregateFunction,
+			ProcessAllWindowFunction<V, R, W> windowFunction,
+			TypeInformation<ACC> accumulatorType,
+			TypeInformation<V> aggregateResultType,
+			TypeInformation<R> resultType) {
+
+		checkNotNull(aggregateFunction, "aggregateFunction");
+		checkNotNull(windowFunction, "windowFunction");
+		checkNotNull(accumulatorType, "accumulatorType");
+		checkNotNull(aggregateResultType, "aggregateResultType");
+		checkNotNull(resultType, "resultType");
+
+		if (aggregateFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
+		}
+
+		//clean the closures
+		windowFunction = input.getExecutionEnvironment().clean(windowFunction);
+		aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
+
+		final String callLocation = Utils.getCallLocationName();
+		final String udfName = "AllWindowedStream." + callLocation;
+
+		final String opName;
+		final KeySelector<T, Byte> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			@SuppressWarnings({"unchecked", "rawtypes"})
+			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+					(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(
+							input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			ListStateDescriptor<StreamRecord<T>> stateDesc =
+					new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+			operator = new EvictingWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalAggregateProcessAllWindowFunction<>(aggregateFunction, windowFunction),
+					trigger,
+					evictor,
+					allowedLateness);
+
+		} else {
+			AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>(
+					"window-contents",
+					aggregateFunction,
+					accumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+			operator = new WindowOperator<>(
+					windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalSingleValueProcessAllWindowFunction<>(windowFunction),
+					trigger,
+					allowedLateness);
+		}
+
+		return input.transform(opName, resultType, operator).forceNonParallel();
+	}
+
 	// ------------------------------------------------------------------------
 	//  FoldFunction
 	// ------------------------------------------------------------------------
@@ -630,13 +864,119 @@ public class AllWindowedStream<T, W extends Window> {
 		return input.transform(opName, resultType, operator).forceNonParallel();
 	}
 
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is incrementally aggregated using the given fold function.
+	 *
+	 * @param initialValue The initial value of the fold.
+	 * @param foldFunction The fold function that is used for incremental aggregation.
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@PublicEvolving
+	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> function) {
+
+		TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
+			Utils.getCallLocationName(), true);
+
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+			function, ProcessAllWindowFunction.class, true, true, foldAccumulatorType, null, false);
+
+		return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is incrementally aggregated using the given fold function.
+	 *
+	 * @param initialValue The initial value of the fold.
+	 * @param foldFunction The fold function that is used for incremental aggregation.
+	 * @param function The process window function.
+	 * @param foldAccumulatorType Type information for the result type of the fold function
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@PublicEvolving
+	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
+			FoldFunction<T, ACC> foldFunction,
+			ProcessAllWindowFunction<ACC, R, W> function,
+			TypeInformation<ACC> foldAccumulatorType,
+			TypeInformation<R> resultType) {
+		if (foldFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction.");
+		}
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
+		}
+
+		//clean the closures
+		function = input.getExecutionEnvironment().clean(function);
+		foldFunction = input.getExecutionEnvironment().clean(foldFunction);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "AllWindowedStream." + callLocation;
+
+		String opName;
+		KeySelector<T, Byte> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			@SuppressWarnings({"unchecked", "rawtypes"})
+			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+				(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			ListStateDescriptor<StreamRecord<T>> stateDesc =
+				new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+			operator =
+				new EvictingWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalIterableProcessAllWindowFunction<>(new FoldApplyProcessAllWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)),
+					trigger,
+					evictor,
+					allowedLateness);
+
+		} else {
+			FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
+				initialValue, foldFunction, foldAccumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+			operator =
+				new WindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalSingleValueProcessAllWindowFunction<>(function),
+					trigger,
+					allowedLateness);
+		}
+
+		return input.transform(opName, resultType, operator).forceNonParallel();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Apply (Window Function)
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Applies the given window function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the window function is
+	 * evaluation of the window. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
 	 * <p>
@@ -647,15 +987,16 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
 	public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
+		String callLocation = Utils.getCallLocationName();
+		function = input.getExecutionEnvironment().clean(function);
 		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
 				function, AllWindowFunction.class, true, true, getInputType(), null, false);
-
-		return apply(function, resultType);
+		return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
 	}
 
 	/**
 	 * Applies the given window function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the window function is
+	 * evaluation of the window. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
 	 * <p>
@@ -663,15 +1004,56 @@ public class AllWindowedStream<T, W extends Window> {
 	 * is evaluated, as the function provides no means of incremental aggregation.
 	 *
 	 * @param function The window function.
-	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
 	public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+		String callLocation = Utils.getCallLocationName();
+		function = input.getExecutionEnvironment().clean(function);
+		return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
+	}
 
-		//clean the closure
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of incremental aggregation.
+	 *
+	 * @param function The process window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function) {
+		String callLocation = Utils.getCallLocationName();
 		function = input.getExecutionEnvironment().clean(function);
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, ProcessAllWindowFunction.class, true, true, getInputType(), null, false);
+		return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation);
+	}
 
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of incremental aggregation.
+	 *
+	 * @param function The process window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
 		String callLocation = Utils.getCallLocationName();
+		function = input.getExecutionEnvironment().clean(function);
+		return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation);
+	}
+
+	private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, Byte, W> function, TypeInformation<R> resultType, String callLocation) {
+
 		String udfName = "AllWindowedStream." + callLocation;
 
 		String opName;
@@ -695,7 +1077,7 @@ public class AllWindowedStream<T, W extends Window> {
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
-					new InternalIterableAllWindowFunction<>(function),
+					function,
 					trigger,
 					evictor,
 					allowedLateness);
@@ -712,7 +1094,7 @@ public class AllWindowedStream<T, W extends Window> {
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
-					new InternalIterableAllWindowFunction<>(function),
+					function,
 					trigger,
 					allowedLateness);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
new file mode 100644
index 0000000..5ac6766
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+
+@Internal
+public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
+	extends RichProcessAllWindowFunction<T, R, W>
+	implements OutputTypeConfigurable<R> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final FoldFunction<T, ACC> foldFunction;
+	private final ProcessAllWindowFunction<ACC, R, W> windowFunction;
+
+	private byte[] serializedInitialValue;
+	private TypeSerializer<ACC> accSerializer;
+	private final TypeInformation<ACC> accTypeInformation;
+	private transient ACC initialValue;
+
+	public FoldApplyProcessAllWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
+		this.windowFunction = windowFunction;
+		this.foldFunction = foldFunction;
+		this.initialValue = initialValue;
+		this.accTypeInformation = accTypeInformation;
+	}
+
+	@Override
+	public void open(Configuration configuration) throws Exception {
+		FunctionUtils.openFunction(this.windowFunction, configuration);
+
+		if (serializedInitialValue == null) {
+			throw new RuntimeException("No initial value was serialized for the fold " +
+				"window function. Probably the setOutputType method was not called.");
+		}
+
+		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
+		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
+		initialValue = accSerializer.deserialize(in);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(this.windowFunction);
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		super.setRuntimeContext(t);
+
+		FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+	}
+
+	@Override
+	public void process(final Context context, Iterable<T> values, Collector<R> out) throws Exception {
+		ACC result = accSerializer.copy(initialValue);
+
+		for (T val : values) {
+			result = foldFunction.fold(result, val);
+		}
+
+		windowFunction.process(windowFunction.new Context() {
+			@Override
+			public W window() {
+				return context.window();
+			}
+		}, Collections.singletonList(result), out);
+	}
+
+	@Override
+	public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) {
+		accSerializer = accTypeInformation.createSerializer(executionConfig);
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+
+		try {
+			accSerializer.serialize(initialValue, out);
+		} catch (IOException ioe) {
+			throw new RuntimeException("Unable to serialize initial value of type " +
+				initialValue.getClass().getSimpleName() + " of fold window function.", ioe);
+		}
+
+		serializedInitialValue = baos.toByteArray();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
new file mode 100644
index 0000000..622e020
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Base abstract class for functions that are evaluated over non-keyed windows using a context
+ * for retrieving extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of {@code Window} that this window function can be applied on.
+ */
+@PublicEvolving
+public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implements Function {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Evaluates the window and outputs none or several elements.
+	 *
+	 * @param context The context in which the window is being evaluated.
+	 * @param elements The elements in the window being evaluated.
+	 * @param out A collector for emitting elements.
+	 *
+	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+	 */
+	public abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
+
+	/**
+	 * The context holding window metadata
+	 */
+	public abstract class Context {
+		/**
+		 * @return The window that is being evaluated.
+		 */
+		public abstract W window();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
new file mode 100644
index 0000000..142c71e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+@Internal
+public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
+	extends RichProcessAllWindowFunction<T, R, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final ReduceFunction<T> reduceFunction;
+	private final ProcessAllWindowFunction<T, R, W> windowFunction;
+
+	public ReduceApplyProcessAllWindowFunction(ReduceFunction<T> reduceFunction, ProcessAllWindowFunction<T, R, W> windowFunction) {
+		this.windowFunction = windowFunction;
+		this.reduceFunction = reduceFunction;
+	}
+
+	@Override
+	public void process(final Context context, Iterable<T> input, Collector<R> out) throws Exception {
+
+		T curr = null;
+		for (T val: input) {
+			if (curr == null) {
+				curr = val;
+			} else {
+				curr = reduceFunction.reduce(curr, val);
+			}
+		}
+		windowFunction.process(windowFunction.new Context() {
+			@Override
+			public W window() {
+				return context.window();
+			}
+		}, Collections.singletonList(curr), out);
+	}
+
+	@Override
+	public void open(Configuration configuration) throws Exception {
+		FunctionUtils.openFunction(this.windowFunction, configuration);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(this.windowFunction);
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		super.setRuntimeContext(t);
+
+		FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
new file mode 100644
index 0000000..1130fa5
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Base rich abstract class for functions that are evaluated over keyed (grouped) windows using a context
+ * for passing extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of {@code Window} that this window function can be applied on.
+ */
+@PublicEvolving
+public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window>
+		extends ProcessAllWindowFunction<IN, OUT, W>
+		implements RichFunction {
+
+	private static final long serialVersionUID = 1L;
+
+
+	// --------------------------------------------------------------------------------------------
+	//  Runtime context access
+	// --------------------------------------------------------------------------------------------
+
+	private transient RuntimeContext runtimeContext;
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		this.runtimeContext = t;
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		if (this.runtimeContext != null) {
+			return this.runtimeContext;
+		} else {
+			throw new IllegalStateException("The runtime context has not been initialized.");
+		}
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		if (this.runtimeContext == null) {
+			throw new IllegalStateException("The runtime context has not been initialized.");
+		} else if (this.runtimeContext instanceof IterationRuntimeContext) {
+			return (IterationRuntimeContext) this.runtimeContext;
+		} else {
+			throw new IllegalStateException("This stub is not part of an iteration step function.");
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Default life cycle methods
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void open(Configuration parameters) throws Exception {}
+
+	@Override
+	public void close() throws Exception {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
new file mode 100644
index 0000000..9533c95
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link ProcessAllWindowFunction} that takes an
+ * {@code Iterable} and an {@link AggregateFunction}.
+ *
+ * @param <W> The window type
+ * @param <T> The type of the input to the AggregateFunction
+ * @param <ACC> The type of the AggregateFunction's accumulator
+ * @param <V> The type of the AggregateFunction's result, and the input to the WindowFunction
+ * @param <R> The result type of the WindowFunction
+ */
+public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W extends Window>
+		extends WrappingFunction<ProcessAllWindowFunction<V, R, W>>
+		implements InternalWindowFunction<Iterable<T>, R, Byte, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final AggregateFunction<T, ACC, V> aggFunction;
+
+	public InternalAggregateProcessAllWindowFunction(
+			AggregateFunction<T, ACC, V> aggFunction,
+			ProcessAllWindowFunction<V, R, W> windowFunction) {
+		super(windowFunction);
+		this.aggFunction = aggFunction;
+	}
+
+	@Override
+	public void apply(Byte key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
+		ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
+		ProcessAllWindowFunction<V, R, W>.Context context = wrappedFunction.new Context() {
+			@Override
+			public W window() {
+				return window;
+			}
+		};
+
+		final ACC acc = aggFunction.createAccumulator();
+
+		for (T val : input) {
+			aggFunction.add(val, acc);
+		}
+
+		wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out);
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
new file mode 100644
index 0000000..e33cc2a
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Internal window function for wrapping a {@link ProcessAllWindowFunction} that takes an {@code Iterable}
+ * when the window state also is an {@code Iterable}.
+ */
+public final class InternalIterableProcessAllWindowFunction<IN, OUT, W extends Window>
+		extends WrappingFunction<ProcessAllWindowFunction<IN, OUT, W>>
+		implements InternalWindowFunction<Iterable<IN>, OUT, Byte, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	public InternalIterableProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) {
+		super(wrappedFunction);
+	}
+
+	@Override
+	public void apply(Byte key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+		ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() {
+			@Override
+			public W window() {
+				return window;
+			}
+		};
+
+		wrappedFunction.process(context, input, out);
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
new file mode 100644
index 0000000..0284ef7
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link ProcessAllWindowFunction} that takes an {@code Iterable}
+ * when the window state is a single value.
+ */
+public final class InternalSingleValueProcessAllWindowFunction<IN, OUT, W extends Window>
+		extends WrappingFunction<ProcessAllWindowFunction<IN, OUT, W>>
+		implements InternalWindowFunction<IN, OUT, Byte, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	public InternalSingleValueProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) {
+		super(wrappedFunction);
+	}
+
+	@Override
+	public void apply(Byte key, final W window, IN input, Collector<OUT> out) throws Exception {
+		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+		ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() {
+			@Override
+			public W window() {
+				return window;
+			}
+		};
+
+		wrappedFunction.process(context, Collections.singletonList(input), out);
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
index b28c208..7a4e8c6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
@@ -21,14 +21,13 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
 import java.util.Collections;
 
 /**
- * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable}
+ * Internal window function for wrapping a {@link ProcessWindowFunction} that takes an {@code Iterable}
  * when the window state is a single value.
  */
 public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W extends Window>

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
index af5c77a..734879d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -22,14 +22,17 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -39,6 +42,7 @@ import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -145,6 +149,101 @@ public class FoldApplyProcessWindowFunctionTest {
 		Assert.assertEquals(expected, result);
 	}
 
+		/**
+	 * Tests that the FoldWindowFunction gets the output type serializer set by the
+	 * StreamGraphGenerator and checks that the FoldWindowFunction computes the correct result.
+	 */
+	@Test
+	public void testFoldAllWindowFunctionOutputTypeConfigurable() throws Exception{
+		StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment();
+
+		List<StreamTransformation<?>> transformations = new ArrayList<>();
+
+		int initValue = 1;
+
+		FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, Integer> foldWindowFunction = new FoldApplyProcessAllWindowFunction<>(
+			initValue,
+			new FoldFunction<Integer, Integer>() {
+				@Override
+				public Integer fold(Integer accumulator, Integer value) throws Exception {
+					return accumulator + value;
+				}
+
+			},
+			new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
+				@Override
+				public void process(Context context,
+									Iterable<Integer> input,
+									Collector<Integer> out) throws Exception {
+					for (Integer in: input) {
+						out.collect(in);
+					}
+				}
+			},
+			BasicTypeInfo.INT_TYPE_INFO
+		);
+
+		AccumulatingProcessingTimeWindowOperator<Byte, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
+			new InternalIterableProcessAllWindowFunction<>(foldWindowFunction),
+			new KeySelector<Integer, Byte>() {
+				private static final long serialVersionUID = -7951310554369722809L;
+
+				@Override
+				public Byte getKey(Integer value) throws Exception {
+					return 0;
+				}
+			},
+			ByteSerializer.INSTANCE,
+			IntSerializer.INSTANCE,
+			3000,
+			3000
+		);
+
+		SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){
+
+			private static final long serialVersionUID = 8297735565464653028L;
+
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+
+			}
+
+			@Override
+			public void cancel() {
+
+			}
+		};
+
+		SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1);
+
+		transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
+
+		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
+
+		List<Integer> result = new ArrayList<>();
+		List<Integer> input = new ArrayList<>();
+		List<Integer> expected = new ArrayList<>();
+
+		input.add(1);
+		input.add(2);
+		input.add(3);
+
+		for (int value : input) {
+			initValue += value;
+		}
+
+		expected.add(initValue);
+
+		foldWindowFunction.process(foldWindowFunction.new Context() {
+			@Override
+			public TimeWindow window() {
+				return new TimeWindow(0, 1);
+			}
+		}, input, new ListCollector<>(result));
+
+		Assert.assertEquals(expected, result);
+	}
+
 	public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
index 813ca96..f306231 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -162,6 +163,24 @@ public class StateDescriptorPassingTest {
 	}
 
 	@Test
+	public void testProcessAllWindowState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+		DataStream<File> src = env.fromElements(new File("/"));
+
+		SingleOutputStreamOperator<?> result = src
+				.timeWindowAll(Time.milliseconds(1000))
+				.process(new ProcessAllWindowFunction<File, String, TimeWindow>() {
+					@Override
+					public void process(Context ctx, Iterable<File> input, Collector<String> out) {}
+				});
+
+		validateListStateDescriptorConfigured(result);
+	}
+
+	@Test
 	public void testFoldWindowAllState() throws Exception {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index e49a496..8f795e9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -26,15 +26,19 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.util.Collector;
@@ -99,6 +103,47 @@ public class InternalWindowFunctionTest {
 
 	@SuppressWarnings("unchecked")
 	@Test
+	public void testInternalIterableProcessAllWindowFunction() throws Exception {
+
+		ProcessAllWindowFunctionMock mock = mock(ProcessAllWindowFunctionMock.class);
+		InternalIterableProcessAllWindowFunction<Long, String, TimeWindow> windowFunction =
+			new InternalIterableProcessAllWindowFunction<>(mock);
+
+		// check setOutputType
+		TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+		ExecutionConfig execConf = new ExecutionConfig();
+		execConf.setParallelism(42);
+
+		StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+		verify(mock).setOutputType(stringType, execConf);
+
+		// check open
+		Configuration config = new Configuration();
+
+		windowFunction.open(config);
+		verify(mock).open(config);
+
+		// check setRuntimeContext
+		RuntimeContext rCtx = mock(RuntimeContext.class);
+
+		windowFunction.setRuntimeContext(rCtx);
+		verify(mock).setRuntimeContext(rCtx);
+
+		// check apply
+		TimeWindow w = mock(TimeWindow.class);
+		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+		Collector<String> c = (Collector<String>) mock(Collector.class);
+
+		windowFunction.apply(((byte)0), w, i, c);
+		verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
+
+		// check close
+		windowFunction.close();
+		verify(mock).close();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
 	public void testInternalIterableWindowFunction() throws Exception {
 
 		WindowFunctionMock mock = mock(WindowFunctionMock.class);
@@ -263,6 +308,47 @@ public class InternalWindowFunctionTest {
 
 	@SuppressWarnings("unchecked")
 	@Test
+	public void testInternalSingleValueProcessAllWindowFunction() throws Exception {
+
+		ProcessAllWindowFunctionMock mock = mock(ProcessAllWindowFunctionMock.class);
+		InternalSingleValueProcessAllWindowFunction<Long, String, TimeWindow> windowFunction =
+			new InternalSingleValueProcessAllWindowFunction<>(mock);
+
+		// check setOutputType
+		TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+		ExecutionConfig execConf = new ExecutionConfig();
+		execConf.setParallelism(42);
+
+		StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+
+		verify(mock).setOutputType(stringType, execConf);
+
+		// check open
+		Configuration config = new Configuration();
+
+		windowFunction.open(config);
+		verify(mock).open(config);
+
+		// check setRuntimeContext
+		RuntimeContext rCtx = mock(RuntimeContext.class);
+
+		windowFunction.setRuntimeContext(rCtx);
+		verify(mock).setRuntimeContext(rCtx);
+
+		// check apply
+		TimeWindow w = mock(TimeWindow.class);
+		Collector<String> c = (Collector<String>) mock(Collector.class);
+
+		windowFunction.apply(((byte)0), w, 23L, c);
+		verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+
+		// check close
+		windowFunction.close();
+		verify(mock).close();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
 	public void testInternalSingleValueProcessWindowFunction() throws Exception {
 
 		ProcessWindowFunctionMock mock = mock(ProcessWindowFunctionMock.class);
@@ -310,7 +396,7 @@ public class InternalWindowFunctionTest {
 		InternalAggregateProcessWindowFunction<Long, Set<Long>, Map<Long, Long>, String, Long, TimeWindow> windowFunction =
 				new InternalAggregateProcessWindowFunction<>(new AggregateFunction<Long, Set<Long>, Map<Long, Long>>() {
 					private static final long serialVersionUID = 1L;
-					
+
 					@Override
 					public Set<Long> createAccumulator() {
 						return new HashSet<>();
@@ -364,7 +450,7 @@ public class InternalWindowFunctionTest {
 		List<Long> args = new LinkedList<>();
 		args.add(23L);
 		args.add(24L);
-		
+
 		windowFunction.apply(42L, w, args, c);
 		verify(mock).process(
 				eq(42L),
@@ -379,6 +465,83 @@ public class InternalWindowFunctionTest {
 		verify(mock).close();
 	}
 
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testInternalAggregateProcessAllWindowFunction() throws Exception {
+
+		AggregateProcessAllWindowFunctionMock mock = mock(AggregateProcessAllWindowFunctionMock.class);
+
+		InternalAggregateProcessAllWindowFunction<Long, Set<Long>, Map<Long, Long>, String, TimeWindow> windowFunction =
+				new InternalAggregateProcessAllWindowFunction<>(new AggregateFunction<Long, Set<Long>, Map<Long, Long>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Set<Long> createAccumulator() {
+						return new HashSet<>();
+					}
+
+					@Override
+					public void add(Long value, Set<Long> accumulator) {
+						accumulator.add(value);
+					}
+
+					@Override
+					public Map<Long, Long> getResult(Set<Long> accumulator) {
+						Map<Long, Long> result = new HashMap<>();
+						for (Long in : accumulator) {
+							result.put(in, in);
+						}
+						return result;
+					}
+
+					@Override
+					public Set<Long> merge(Set<Long> a, Set<Long> b) {
+						a.addAll(b);
+						return a;
+					}
+				}, mock);
+
+		// check setOutputType
+		TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+		ExecutionConfig execConf = new ExecutionConfig();
+		execConf.setParallelism(42);
+
+		StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+		verify(mock).setOutputType(stringType, execConf);
+
+		// check open
+		Configuration config = new Configuration();
+
+		windowFunction.open(config);
+		verify(mock).open(config);
+
+		// check setRuntimeContext
+		RuntimeContext rCtx = mock(RuntimeContext.class);
+
+		windowFunction.setRuntimeContext(rCtx);
+		verify(mock).setRuntimeContext(rCtx);
+
+		// check apply
+		TimeWindow w = mock(TimeWindow.class);
+		Collector<String> c = (Collector<String>) mock(Collector.class);
+
+		List<Long> args = new LinkedList<>();
+		args.add(23L);
+		args.add(24L);
+
+		windowFunction.apply(((byte)0), w, args, c);
+		verify(mock).process(
+				(AggregateProcessAllWindowFunctionMock.Context) anyObject(),
+				(Iterable) argThat(containsInAnyOrder(allOf(
+						hasEntry(is(23L), is(23L)),
+						hasEntry(is(24L), is(24L))))),
+				eq(c));
+
+		// check close
+		windowFunction.close();
+		verify(mock).close();
+	}
+
 	public static class ProcessWindowFunctionMock
 		extends RichProcessWindowFunction<Long, String, Long, TimeWindow>
 		implements OutputTypeConfigurable<String> {
@@ -405,6 +568,19 @@ public class InternalWindowFunctionTest {
 		public void process(Long aLong, Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
 	}
 
+	public static class AggregateProcessAllWindowFunctionMock
+			extends RichProcessAllWindowFunction<Map<Long, Long>, String, TimeWindow>
+			implements OutputTypeConfigurable<String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
+
+		@Override
+		public void process(Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
+	}
+
 	public static class WindowFunctionMock
 		extends RichWindowFunction<Long, String, Long, TimeWindow>
 		implements OutputTypeConfigurable<String> {
@@ -430,4 +606,17 @@ public class InternalWindowFunctionTest {
 		@Override
 		public void apply(TimeWindow window, Iterable<Long> values, Collector<String> out) throws Exception { }
 	}
+
+	public static class ProcessAllWindowFunctionMock
+		extends RichProcessAllWindowFunction<Long, String, TimeWindow>
+		implements OutputTypeConfigurable<String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
+
+		@Override
+		public void process(Context context, Iterable<Long> input, Collector<String> out) throws Exception { }
+	}
 }