You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/17 16:24:16 UTC

[6/8] flink git commit: [FLINK-4997] [streaming] Introduce ProcessWindowFunction

[FLINK-4997] [streaming] Introduce ProcessWindowFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dcb2dcd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dcb2dcd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dcb2dcd

Branch: refs/heads/master
Commit: 1dcb2dcd8969941988a4fc7e5488e9272dfd507e
Parents: 82db667
Author: Ventura Del Monte <ve...@gmail.com>
Authored: Wed Nov 23 18:00:23 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedStream.java          | 381 ++++++++++++++---
 .../FoldApplyProcessWindowFunction.java         | 120 ++++++
 .../windowing/ProcessWindowFunction.java        |  61 +++
 .../ReduceApplyProcessWindowFunction.java       |  80 ++++
 .../windowing/RichProcessWindowFunction.java    |  85 ++++
 .../windowing/AccumulatingKeyedTimePanes.java   |  12 +-
 ...ccumulatingProcessingTimeWindowOperator.java |  16 +-
 .../InternalIterableProcessWindowFunction.java  |  63 +++
 ...nternalSingleValueProcessWindowFunction.java |  66 +++
 .../FoldApplyProcessWindowFunctionTest.java     | 155 +++++++
 .../operators/FoldApplyWindowFunctionTest.java  |  28 +-
 .../functions/InternalWindowFunctionTest.java   | 101 ++++-
 ...AlignedProcessingTimeWindowOperatorTest.java | 419 ++++++++++++++++++-
 .../operators/windowing/WindowOperatorTest.java | 177 ++++++++
 .../streaming/runtime/WindowFoldITCase.java     |  78 ++++
 15 files changed, 1738 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 04da04d..45eaae5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.AggregateFunction;
@@ -39,8 +40,11 @@ import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -60,9 +64,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -360,6 +367,98 @@ public class WindowedStream<T, K, W extends Window> {
 		return input.transform(opName, resultType, operator);
 	}
 
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given reducer.
+	 *
+	 * @param reduceFunction The reduce function that is used for incremental aggregation.
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, ProcessWindowFunction.class, true, true, input.getType(), null, false);
+
+		return reduce(reduceFunction, function, resultType);
+	}
+
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given reducer.
+	 *
+	 * @param reduceFunction The reduce function that is used for incremental aggregation.
+	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@Internal
+	public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
+		if (reduceFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
+		}
+		//clean the closures
+		function = input.getExecutionEnvironment().clean(function);
+		reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowedStream." + callLocation;
+
+		String opName;
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			@SuppressWarnings({"unchecked", "rawtypes"})
+			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+					(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			ListStateDescriptor<StreamRecord<T>> stateDesc =
+					new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+			operator =
+					new EvictingWindowOperator<>(windowAssigner,
+							windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+							keySel,
+							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+							stateDesc,
+							new InternalIterableProcessWindowFunction<>(new ReduceApplyProcessWindowFunction<>(reduceFunction, function)),
+							trigger,
+							evictor,
+							allowedLateness);
+
+		} else {
+			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
+					reduceFunction,
+					input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+			operator =
+					new WindowOperator<>(windowAssigner,
+							windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+							keySel,
+							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+							stateDesc,
+							new InternalSingleValueProcessWindowFunction<>(function),
+							trigger,
+							allowedLateness);
+		}
+
+		return input.transform(opName, resultType, operator);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Fold Function
 	// ------------------------------------------------------------------------
@@ -510,6 +609,117 @@ public class WindowedStream<T, K, W extends Window> {
 		return input.transform(opName, resultType, operator);
 	}
 
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given fold function.
+	 *
+	 * @param initialValue The initial value of the fold.
+	 * @param foldFunction The fold function that is used for incremental aggregation.
+	 * @param windowFunction The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@PublicEvolving
+	public <R, ACC> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction) {
+		if (foldFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction.");
+		}
+
+		TypeInformation<ACC> foldResultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
+				Utils.getCallLocationName(), true);
+
+		TypeInformation<R> windowResultType = TypeExtractor.getUnaryOperatorReturnType(
+				windowFunction, ProcessWindowFunction.class, true, true, foldResultType, Utils.getCallLocationName(), false);
+
+		return fold(initialValue, foldFunction, windowFunction, foldResultType, windowResultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given fold function.
+	 *
+	 * @param initialValue the initial value to be passed to the first invocation of the fold function
+	 * @param foldFunction The fold function.
+	 * @param foldResultType The result type of the fold function.
+	 * @param windowFunction The process window function.
+	 * @param windowResultType The process window function result type.
+	 * @return The data stream that is the result of applying the fold function to the window.
+	 */
+	@Internal
+	public <R, ACC> SingleOutputStreamOperator<R> fold(
+			ACC initialValue,
+			FoldFunction<T, ACC> foldFunction,
+			ProcessWindowFunction<ACC, R, K, W> windowFunction,
+			TypeInformation<ACC> foldResultType,
+			TypeInformation<R> windowResultType) {
+		if (foldFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction.");
+		}
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
+		}
+
+		//clean the closures
+		windowFunction = input.getExecutionEnvironment().clean(windowFunction);
+		foldFunction = input.getExecutionEnvironment().clean(foldFunction);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowedStream." + callLocation;
+
+		String opName;
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			@SuppressWarnings({"unchecked", "rawtypes"})
+			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+					(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			ListStateDescriptor<StreamRecord<T>> stateDesc =
+					new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+			operator =
+					new EvictingWindowOperator<>(windowAssigner,
+							windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+							keySel,
+							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+							stateDesc,
+							new InternalIterableProcessWindowFunction<>(new FoldApplyProcessWindowFunction<>(initialValue, foldFunction, windowFunction, foldResultType)),
+							trigger,
+							evictor,
+							allowedLateness);
+
+		} else {
+			FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
+					initialValue,
+					foldFunction,
+					foldResultType.createSerializer(getExecutionEnvironment().getConfig()));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+			operator =
+					new WindowOperator<>(windowAssigner,
+							windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+							keySel,
+							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+							stateDesc,
+							new InternalSingleValueProcessWindowFunction<>(windowFunction),
+							trigger,
+							allowedLateness);
+		}
+
+		return input.transform(opName, windowResultType, operator);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Aggregation Function
 	// ------------------------------------------------------------------------
@@ -733,11 +943,53 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
 	public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
-
-		//clean the closure
+		String callLocation = Utils.getCallLocationName();
 		function = input.getExecutionEnvironment().clean(function);
+		return apply(new InternalIterableWindowFunction<>(function), resultType, callLocation);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of incremental aggregation.
+	 *
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function) {
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+			function, ProcessWindowFunction.class, true, true, getInputType(), null, false);
+
+		return process(function, resultType);
+	}
 
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of incremental aggregation.
+	 *
+	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@Internal
+	public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
 		String callLocation = Utils.getCallLocationName();
+		function = input.getExecutionEnvironment().clean(function);
+		return apply(new InternalIterableProcessWindowFunction<>(function), resultType, callLocation);
+	}
+
+	private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, String callLocation) {
+
 		String udfName = "WindowedStream." + callLocation;
 
 		SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
@@ -767,7 +1019,7 @@ public class WindowedStream<T, K, W extends Window> {
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
-					new InternalIterableWindowFunction<>(function),
+					function,
 					trigger,
 					evictor,
 					allowedLateness);
@@ -784,7 +1036,7 @@ public class WindowedStream<T, K, W extends Window> {
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
-					new InternalIterableWindowFunction<>(function),
+					function,
 					trigger,
 					allowedLateness,
 					legacyWindowOpType);
@@ -1211,7 +1463,7 @@ public class WindowedStream<T, K, W extends Window> {
 	}
 
 	private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
-			Function function,
+			ReduceFunction<?> function,
 			TypeInformation<R> resultType,
 			String functionName) {
 
@@ -1222,30 +1474,18 @@ public class WindowedStream<T, K, W extends Window> {
 
 			String opName = "Fast " + timeWindows + " of " + functionName;
 
-			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-				@SuppressWarnings("unchecked")
-				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-						new AggregatingProcessingTimeWindowOperator<>(
-								reducer, input.getKeySelector(),
-								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-								windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-			else if (function instanceof WindowFunction) {
-				@SuppressWarnings("unchecked")
-				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
-				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(),
-						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-						windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
+			@SuppressWarnings("unchecked")
+			ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+			@SuppressWarnings("unchecked")
+			OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+					new AggregatingProcessingTimeWindowOperator<>(
+							reducer, input.getKeySelector(),
+							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+							input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+							windowLength, windowSlide);
+			return input.transform(opName, resultType, op);
+
 		} else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
 			TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
 			final long windowLength = timeWindows.getSize();
@@ -1253,36 +1493,69 @@ public class WindowedStream<T, K, W extends Window> {
 
 			String opName = "Fast " + timeWindows + " of " + functionName;
 
-			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-				@SuppressWarnings("unchecked")
-				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-						new AggregatingProcessingTimeWindowOperator<>(
-								reducer,
-								input.getKeySelector(),
-								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-								windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-			else if (function instanceof WindowFunction) {
-				@SuppressWarnings("unchecked")
-				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
-				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(),
-						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-						windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
+			@SuppressWarnings("unchecked")
+			ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+			@SuppressWarnings("unchecked")
+			OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+					new AggregatingProcessingTimeWindowOperator<>(
+							reducer,
+							input.getKeySelector(),
+							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+							input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+							windowLength, windowSlide);
+			return input.transform(opName, resultType, op);
+		}
+
+		return null;
+	}
+
+	private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
+			InternalWindowFunction<Iterable<T>, R, K, W> function,
+			TypeInformation<R> resultType,
+			String functionName) {
+
+		if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
+			SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner;
+			final long windowLength = timeWindows.getSize();
+			final long windowSlide = timeWindows.getSlide();
+
+			String opName = "Fast " + timeWindows + " of " + functionName;
+
+			@SuppressWarnings("unchecked")
+			InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction =
+					(InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function;
+
+			OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+					timeWindowFunction, input.getKeySelector(),
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+					windowLength, windowSlide);
+			return input.transform(opName, resultType, op);
+		} else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
+			TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
+			final long windowLength = timeWindows.getSize();
+			final long windowSlide = timeWindows.getSize();
+
+			String opName = "Fast " + timeWindows + " of " + functionName;
+
+			@SuppressWarnings("unchecked")
+			InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction =
+					(InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function;
+
+
+			OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+					timeWindowFunction, input.getKeySelector(),
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+					windowLength, windowSlide);
+			return input.transform(opName, resultType, op);
 		}
 
 		return null;
 	}
 
+
 	public StreamExecutionEnvironment getExecutionEnvironment() {
 		return input.getExecutionEnvironment();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
new file mode 100644
index 0000000..e1bc759
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+
+@Internal
+public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
+	extends RichProcessWindowFunction<T, R, K, W>
+	implements OutputTypeConfigurable<R> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final FoldFunction<T, ACC> foldFunction;
+	private final ProcessWindowFunction<ACC, R, K, W> windowFunction;
+
+	private byte[] serializedInitialValue;
+	private TypeSerializer<ACC> accSerializer;
+	private final TypeInformation<ACC> accTypeInformation;
+	private transient ACC initialValue;
+
+	public FoldApplyProcessWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
+		this.windowFunction = windowFunction;
+		this.foldFunction = foldFunction;
+		this.initialValue = initialValue;
+		this.accTypeInformation = accTypeInformation;
+	}
+
+	@Override
+	public void open(Configuration configuration) throws Exception {
+		FunctionUtils.openFunction(this.windowFunction, configuration);
+
+		if (serializedInitialValue == null) {
+			throw new RuntimeException("No initial value was serialized for the fold " +
+				"window function. Probably the setOutputType method was not called.");
+		}
+
+		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
+		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
+		initialValue = accSerializer.deserialize(in);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(this.windowFunction);
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		super.setRuntimeContext(t);
+
+		FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+	}
+
+	@Override
+	public void process(K key, final Context context, Iterable<T> values, Collector<R> out) throws Exception {
+		ACC result = accSerializer.copy(initialValue);
+
+		for (T val : values) {
+			result = foldFunction.fold(result, val);
+		}
+
+		windowFunction.process(key, windowFunction.new Context() {
+			@Override
+			public W window() {
+				return context.window();
+			}
+		}, Collections.singletonList(result), out);
+	}
+
+	@Override
+	public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) {
+		accSerializer = accTypeInformation.createSerializer(executionConfig);
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+
+		try {
+			accSerializer.serialize(initialValue, out);
+		} catch (IOException ioe) {
+			throw new RuntimeException("Unable to serialize initial value of type " +
+				initialValue.getClass().getSimpleName() + " of fold window function.", ioe);
+		}
+
+		serializedInitialValue = baos.toByteArray();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
new file mode 100644
index 0000000..9c48e24
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Base abstract class for functions that are evaluated over keyed (grouped) windows using a context
+ * for retrieving extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of {@code Window} that this window function can be applied on.
+ */
+@PublicEvolving
+public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Evaluates the window and outputs none or several elements.
+	 *
+	 * @param key The key for which this window is evaluated.
+	 * @param context The context in which the window is being evaluated.
+	 * @param elements The elements in the window being evaluated.
+	 * @param out A collector for emitting elements.
+	 *
+	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+	 */
+	public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
+
+	/**
+	 * The context holding window metadata
+	 */
+	public abstract class Context {
+		/**
+		 * @return The window that is being evaluated.
+		 */
+		public abstract W window();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
new file mode 100644
index 0000000..9ea1fdf
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+@Internal
+public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
+	extends RichProcessWindowFunction<T, R, K, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final ReduceFunction<T> reduceFunction;
+	private final ProcessWindowFunction<T, R, K, W> windowFunction;
+
+	public ReduceApplyProcessWindowFunction(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> windowFunction) {
+		this.windowFunction = windowFunction;
+		this.reduceFunction = reduceFunction;
+	}
+
+	@Override
+	public void process(K k, final Context context, Iterable<T> input, Collector<R> out) throws Exception {
+
+		T curr = null;
+		for (T val: input) {
+			if (curr == null) {
+				curr = val;
+			} else {
+				curr = reduceFunction.reduce(curr, val);
+			}
+		}
+		windowFunction.process(k, windowFunction.new Context() {
+			@Override
+			public W window() {
+				return context.window();
+			}
+		}, Collections.singletonList(curr), out);
+	}
+
+	@Override
+	public void open(Configuration configuration) throws Exception {
+		FunctionUtils.openFunction(this.windowFunction, configuration);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(this.windowFunction);
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		super.setRuntimeContext(t);
+
+		FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
new file mode 100644
index 0000000..ac55bc6
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Base rich abstract class for functions that are evaluated over keyed (grouped) windows using a context
+ * for passing extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of {@code Window} that this window function can be applied on.
+ */
+@PublicEvolving
+public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window>
+		extends ProcessWindowFunction<IN, OUT, KEY, W>
+		implements RichFunction {
+
+	private static final long serialVersionUID = 1L;
+
+
+	// --------------------------------------------------------------------------------------------
+	//  Runtime context access
+	// --------------------------------------------------------------------------------------------
+
+	private transient RuntimeContext runtimeContext;
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		this.runtimeContext = t;
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		if (this.runtimeContext != null) {
+			return this.runtimeContext;
+		} else {
+			throw new IllegalStateException("The runtime context has not been initialized.");
+		}
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		if (this.runtimeContext == null) {
+			throw new IllegalStateException("The runtime context has not been initialized.");
+		} else if (this.runtimeContext instanceof IterationRuntimeContext) {
+			return (IterationRuntimeContext) this.runtimeContext;
+		} else {
+			throw new IllegalStateException("This stub is not part of an iteration step function.");
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Default life cycle methods
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void open(Configuration parameters) throws Exception {}
+
+	@Override
+	public void close() throws Exception {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index a252ece..87c5aca 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.util.UnionIterator;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -36,7 +36,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 	private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
 
-	private final WindowFunction<Type, Result, Key, Window> function;
+	private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
 
 	/**
 	 * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
@@ -44,7 +44,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 	// ------------------------------------------------------------------------
 	
-	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Type, Result, Key, Window> function) {
+	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) {
 		this.keySelector = keySelector;
 		this.function = function;
 	}
@@ -59,7 +59,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 	}
 
 	@Override
-	public void evaluateWindow(Collector<Result> out, TimeWindow window, 
+	public void evaluateWindow(Collector<Result> out, final TimeWindow window,
 								AbstractStreamOperator<Result> operator) throws Exception
 	{
 		if (previousPanes.isEmpty()) {
@@ -86,7 +86,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 	
 	static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
 
-		private final WindowFunction<Type, Result, Key, Window> function;
+		private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
 		
 		private final UnionIterator<Type> unionIterator;
 		
@@ -99,7 +99,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 		private Key currentKey;
 		
 
-		WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window,
+		WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window,
 								Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
 			this.function = function;
 			this.out = out;

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 7adaf13..094b34d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -23,22 +23,22 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.state.ArrayListSerializer;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 
 import java.util.ArrayList;
 
 @Internal
 @Deprecated
 public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
+		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, InternalWindowFunction<Iterable<IN>, OUT, KEY, TimeWindow>> {
 
 	private static final long serialVersionUID = 7305948082830843475L;
 
-
+	
 	public AccumulatingProcessingTimeWindowOperator(
-			WindowFunction<IN, OUT, KEY, TimeWindow> function,
+			InternalWindowFunction<Iterable<IN>, OUT, KEY, TimeWindow> function,
 			KeySelector<IN, KEY> keySelector,
 			TypeSerializer<KEY> keySerializer,
 			TypeSerializer<IN> valueSerializer,
@@ -46,14 +46,14 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 			long windowSlide)
 	{
 		super(function, keySelector, keySerializer,
-				new ArrayListSerializer<IN>(valueSerializer), windowLength, windowSlide);
+				new ArrayListSerializer<>(valueSerializer), windowLength, windowSlide);
 	}
 
 	@Override
 	protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
 		@SuppressWarnings("unchecked")
-		WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function;
-
+		InternalWindowFunction<Iterable<IN>, OUT, KEY, Window> windowFunction = (InternalWindowFunction<Iterable<IN>, OUT, KEY, Window>) function;
+		
 		return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
new file mode 100644
index 0000000..de516a5
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Internal window function for wrapping a {@link ProcessWindowFunction} that takes an {@code Iterable}
+ * when the window state also is an {@code Iterable}.
+ */
+public final class InternalIterableProcessWindowFunction<IN, OUT, KEY, W extends Window>
+		extends WrappingFunction<ProcessWindowFunction<IN, OUT, KEY, W>>
+		implements InternalWindowFunction<Iterable<IN>, OUT, KEY, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	public InternalIterableProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
+		super(wrappedFunction);
+	}
+
+	@Override
+	public void apply(KEY key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+		ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+		ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
+			@Override
+			public W window() {
+				return window;
+			}
+		};
+		
+		wrappedFunction.process(key, context, input, out);
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
new file mode 100644
index 0000000..b28c208
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable}
+ * when the window state is a single value.
+ */
+public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W extends Window>
+		extends WrappingFunction<ProcessWindowFunction<IN, OUT, KEY, W>>
+		implements InternalWindowFunction<IN, OUT, KEY, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	public InternalSingleValueProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
+		super(wrappedFunction);
+	}
+
+	@Override
+	public void apply(KEY key, final W window, IN input, Collector<OUT> out) throws Exception {
+		ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+		ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
+			@Override
+			public W window() {
+				return window;
+			}
+		};
+
+		wrappedFunction.process(key, context, Collections.singletonList(input), out);
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
new file mode 100644
index 0000000..af5c77a
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FoldApplyProcessWindowFunctionTest {
+
+	/**
+	 * Tests that the FoldWindowFunction gets the output type serializer set by the
+	 * StreamGraphGenerator and checks that the FoldWindowFunction computes the correct result.
+	 */
+	@Test
+	public void testFoldWindowFunctionOutputTypeConfigurable() throws Exception{
+		StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment();
+
+		List<StreamTransformation<?>> transformations = new ArrayList<>();
+
+		int initValue = 1;
+
+		FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, Integer, Integer> foldWindowFunction = new FoldApplyProcessWindowFunction<>(
+			initValue,
+			new FoldFunction<Integer, Integer>() {
+				@Override
+				public Integer fold(Integer accumulator, Integer value) throws Exception {
+					return accumulator + value;
+				}
+
+			},
+			new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
+				@Override
+				public void process(Integer integer,
+									Context context,
+									Iterable<Integer> input,
+									Collector<Integer> out) throws Exception {
+					for (Integer in: input) {
+						out.collect(in);
+					}
+				}
+			},
+			BasicTypeInfo.INT_TYPE_INFO
+		);
+
+		AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
+			new InternalIterableProcessWindowFunction<>(foldWindowFunction),
+			new KeySelector<Integer, Integer>() {
+				private static final long serialVersionUID = -7951310554369722809L;
+
+				@Override
+				public Integer getKey(Integer value) throws Exception {
+					return value;
+				}
+			},
+			IntSerializer.INSTANCE,
+			IntSerializer.INSTANCE,
+			3000,
+			3000
+		);
+
+		SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){
+
+			private static final long serialVersionUID = 8297735565464653028L;
+
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+
+			}
+
+			@Override
+			public void cancel() {
+
+			}
+		};
+
+		SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1);
+
+		transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
+
+		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
+
+		List<Integer> result = new ArrayList<>();
+		List<Integer> input = new ArrayList<>();
+		List<Integer> expected = new ArrayList<>();
+
+		input.add(1);
+		input.add(2);
+		input.add(3);
+
+		for (int value : input) {
+			initValue += value;
+		}
+
+		expected.add(initValue);
+
+		foldWindowFunction.process(0, foldWindowFunction.new Context() {
+			@Override
+			public TimeWindow window() {
+				return new TimeWindow(0, 1);
+			}
+		}, input, new ListCollector<>(result));
+
+		Assert.assertEquals(expected, result);
+	}
+
+	public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+
+		@Override
+		public JobExecutionResult execute(String jobName) throws Exception {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index 91ec427..fecd440 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 import org.junit.Assert;
@@ -81,19 +82,20 @@ public class FoldApplyWindowFunctionTest {
 		);
 
 		AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
-			foldWindowFunction,
-			new KeySelector<Integer, Integer>() {
-				private static final long serialVersionUID = -7951310554369722809L;
-
-				@Override
-				public Integer getKey(Integer value) throws Exception {
-					return value;
-				}
-			},
-			IntSerializer.INSTANCE,
-			IntSerializer.INSTANCE,
-			3000,
-			3000
+			new InternalIterableWindowFunction<>(
+					foldWindowFunction),
+				new KeySelector<Integer, Integer>() {
+					private static final long serialVersionUID = -7951310554369722809L;
+
+					@Override
+					public Integer getKey(Integer value) throws Exception {
+						return value;
+					}
+				},
+				IntSerializer.INSTANCE,
+				IntSerializer.INSTANCE,
+				3000,
+				3000
 		);
 
 		SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index f3c3423..3c73035 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -25,12 +25,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.util.Collector;
 import org.hamcrest.collection.IsIterableContainingInOrder;
@@ -115,7 +118,48 @@ public class InternalWindowFunctionTest {
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 
 		windowFunction.apply(42L, w, i, c);
-		verify(mock).apply(42L, w, i, c);
+		verify(mock).apply(eq(42L), eq(w), eq(i), eq(c));
+
+		// check close
+		windowFunction.close();
+		verify(mock).close();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testInternalIterableProcessWindowFunction() throws Exception {
+
+		ProcessWindowFunctionMock mock = mock(ProcessWindowFunctionMock.class);
+		InternalIterableProcessWindowFunction<Long, String, Long, TimeWindow> windowFunction =
+				new InternalIterableProcessWindowFunction<>(mock);
+
+		// check setOutputType
+		TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+		ExecutionConfig execConf = new ExecutionConfig();
+		execConf.setParallelism(42);
+
+		StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+		verify(mock).setOutputType(stringType, execConf);
+
+		// check open
+		Configuration config = new Configuration();
+
+		windowFunction.open(config);
+		verify(mock).open(config);
+
+		// check setRuntimeContext
+		RuntimeContext rCtx = mock(RuntimeContext.class);
+
+		windowFunction.setRuntimeContext(rCtx);
+		verify(mock).setRuntimeContext(rCtx);
+
+		// check apply
+		TimeWindow w = mock(TimeWindow.class);
+		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+		Collector<String> c = (Collector<String>) mock(Collector.class);
+
+		windowFunction.apply(42L, w, i, c);
+		verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
 
 		// check close
 		windowFunction.close();
@@ -204,6 +248,59 @@ public class InternalWindowFunctionTest {
 		verify(mock).close();
 	}
 
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testInternalSingleValueProcessWindowFunction() throws Exception {
+
+		ProcessWindowFunctionMock mock = mock(ProcessWindowFunctionMock.class);
+		InternalSingleValueProcessWindowFunction<Long, String, Long, TimeWindow> windowFunction =
+				new InternalSingleValueProcessWindowFunction<>(mock);
+
+		// check setOutputType
+		TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+		ExecutionConfig execConf = new ExecutionConfig();
+		execConf.setParallelism(42);
+
+		StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+		verify(mock).setOutputType(stringType, execConf);
+
+		// check open
+		Configuration config = new Configuration();
+
+		windowFunction.open(config);
+		verify(mock).open(config);
+
+		// check setRuntimeContext
+		RuntimeContext rCtx = mock(RuntimeContext.class);
+
+		windowFunction.setRuntimeContext(rCtx);
+		verify(mock).setRuntimeContext(rCtx);
+
+		// check apply
+		TimeWindow w = mock(TimeWindow.class);
+		Collector<String> c = (Collector<String>) mock(Collector.class);
+
+		windowFunction.apply(42L, w, 23L, c);
+		verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+
+		// check close
+		windowFunction.close();
+		verify(mock).close();
+	}
+
+	public static class ProcessWindowFunctionMock
+		extends RichProcessWindowFunction<Long, String, Long, TimeWindow>
+		implements OutputTypeConfigurable<String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
+
+		@Override
+		public void process(Long aLong, Context context, Iterable<Long> input, Collector<String> out) throws Exception { }
+	}
+
 	public static class WindowFunctionMock
 		extends RichWindowFunction<Long, String, Long, TimeWindow>
 		implements OutputTypeConfigurable<String> {
@@ -214,7 +311,7 @@ public class InternalWindowFunctionTest {
 		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
 
 		@Override
-		public void apply(Long aLong, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception { }
+		public void apply(Long aLong, TimeWindow w, Iterable<Long> input, Collector<String> out) throws Exception { }
 	}
 
 	public static class AllWindowFunctionMock

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 255a20f..508d2e1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -36,8 +36,12 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -47,6 +51,9 @@ import org.apache.flink.util.Collector;
 
 import org.junit.After;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -64,10 +71,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @SuppressWarnings({"serial"})
+@PrepareForTest(InternalIterableWindowFunction.class)
+@RunWith(PowerMockRunner.class)
 public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	@SuppressWarnings("unchecked")
-	private final WindowFunction<String, String, String, TimeWindow> mockFunction = mock(WindowFunction.class);
+	private final InternalIterableWindowFunction<String, String, String, TimeWindow> mockFunction = mock(InternalIterableWindowFunction.class);
 
 	@SuppressWarnings("unchecked")
 	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
@@ -79,26 +88,34 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 	};
 	
-	private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
-			new WindowFunction<Integer, Integer, Integer, TimeWindow>()
-	{
-		@Override
-		public void apply(Integer key,
-				TimeWindow window,
-				Iterable<Integer> values,
-				Collector<Integer> out) {
-			for (Integer val : values) {
-				assertEquals(key, val);
-				out.collect(val);
-			}
-		}
-	};
+	private final InternalIterableWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
+			new InternalIterableWindowFunction<>(new WindowFunction<Integer, Integer, Integer, TimeWindow>() {
+				@Override
+				public void apply(Integer key, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+					for (Integer val : values) {
+						assertEquals(key, val);
+						out.collect(val);
+					}
+				}
+			});
+
+	private final InternalIterableProcessWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityProcessFunction =
+			new InternalIterableProcessWindowFunction<>(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
+				@Override
+				public void process(Integer key, Context context, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+					for (Integer val : values) {
+						assertEquals(key, val);
+						out.collect(val);
+					}
+				}
+			});
 
 	// ------------------------------------------------------------------------
 
 	public AccumulatingAlignedProcessingTimeWindowOperatorTest() {
 		ClosureCleaner.clean(identitySelector, false);
 		ClosureCleaner.clean(validatingIdentityFunction, false);
+		ClosureCleaner.clean(validatingIdentityProcessFunction, false);
 	}
 	
 	// ------------------------------------------------------------------------
@@ -281,6 +298,50 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@Test
+	public void testTumblingWindowWithProcessFunction() throws Exception {
+		try {
+			final int windowSize = 50;
+
+			// tumbling window that triggers every 20 milliseconds
+			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+				new AccumulatingProcessingTimeWindowOperator<>(
+					validatingIdentityProcessFunction, identitySelector,
+					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+					windowSize, windowSize);
+
+			KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+			testHarness.open();
+
+			final int numElements = 1000;
+
+			long currentTime = 0;
+
+			for (int i = 0; i < numElements; i++) {
+				testHarness.processElement(new StreamRecord<>(i));
+				currentTime = currentTime + 10;
+				testHarness.setProcessingTime(currentTime);
+			}
+
+
+			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+			assertEquals(numElements, result.size());
+
+			Collections.sort(result);
+			for (int i = 0; i < numElements; i++) {
+				assertEquals(i, result.get(i).intValue());
+			}
+
+			testHarness.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void testSlidingWindow() throws Exception {
 
 		// tumbling window that triggers every 20 milliseconds
@@ -333,6 +394,58 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@Test
+	public void testSlidingWindowWithProcessFunction() throws Exception {
+
+		// tumbling window that triggers every 20 milliseconds
+		AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+			new AccumulatingProcessingTimeWindowOperator<>(
+				validatingIdentityProcessFunction, identitySelector,
+				IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+			new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.open();
+
+		final int numElements = 1000;
+
+		long currentTime = 0;
+
+		for (int i = 0; i < numElements; i++) {
+			testHarness.processElement(new StreamRecord<>(i));
+			currentTime = currentTime + 10;
+			testHarness.setProcessingTime(currentTime);
+		}
+
+		// get and verify the result
+		List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+
+		// if we kept this running, each element would be in the result three times (for each slide).
+		// we are closing the window before the final panes are through three times, so we may have less
+		// elements.
+		if (result.size() < numElements || result.size() > 3 * numElements) {
+			fail("Wrong number of results: " + result.size());
+		}
+
+		Collections.sort(result);
+		int lastNum = -1;
+		int lastCount = -1;
+
+		for (int num : result) {
+			if (num == lastNum) {
+				lastCount++;
+				assertTrue(lastCount <= 3);
+			}
+			else {
+				lastNum = num;
+				lastCount = 1;
+			}
+		}
+
+		testHarness.close();
+	}
+
+	@Test
 	public void testTumblingWindowSingleElements() throws Exception {
 
 		try {
@@ -379,7 +492,55 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
+	@Test
+	public void testTumblingWindowSingleElementsWithProcessFunction() throws Exception {
+
+		try {
+
+			// tumbling window that triggers every 20 milliseconds
+			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+				new AccumulatingProcessingTimeWindowOperator<>(
+					validatingIdentityProcessFunction, identitySelector,
+					IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
+
+			KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+			testHarness.open();
+
+			testHarness.setProcessingTime(0);
+
+			testHarness.processElement(new StreamRecord<>(1));
+			testHarness.processElement(new StreamRecord<>(2));
+
+			testHarness.setProcessingTime(50);
+
+			testHarness.processElement(new StreamRecord<>(3));
+			testHarness.processElement(new StreamRecord<>(4));
+			testHarness.processElement(new StreamRecord<>(5));
+
+			testHarness.setProcessingTime(100);
+
+			testHarness.processElement(new StreamRecord<>(6));
+
+			testHarness.setProcessingTime(200);
+
+
+			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+			assertEquals(6, result.size());
+
+			Collections.sort(result);
+			assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
+
+			testHarness.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
 	@Test
 	public void testSlidingWindowSingleElements() throws Exception {
 		try {
@@ -420,6 +581,126 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@Test
+	public void testSlidingWindowSingleElementsWithProcessFunction() throws Exception {
+		try {
+
+			// tumbling window that triggers every 20 milliseconds
+			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+				new AccumulatingProcessingTimeWindowOperator<>(
+					validatingIdentityProcessFunction, identitySelector,
+					IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
+
+			KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+			testHarness.setProcessingTime(0);
+
+			testHarness.open();
+
+			testHarness.processElement(new StreamRecord<>(1));
+			testHarness.processElement(new StreamRecord<>(2));
+
+			testHarness.setProcessingTime(50);
+			testHarness.setProcessingTime(100);
+			testHarness.setProcessingTime(150);
+
+			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+
+			assertEquals(6, result.size());
+
+			Collections.sort(result);
+			assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
+
+			testHarness.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void checkpointRestoreWithPendingWindowTumblingWithProcessFunction() {
+		try {
+			final int windowSize = 200;
+
+			// tumbling window that triggers every 200 milliseconds
+			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+				new AccumulatingProcessingTimeWindowOperator<>(
+					validatingIdentityProcessFunction, identitySelector,
+					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+					windowSize, windowSize);
+
+			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+				new OneInputStreamOperatorTestHarness<>(op);
+
+			testHarness.setup();
+			testHarness.open();
+
+			testHarness.setProcessingTime(0);
+
+			// inject some elements
+			final int numElementsFirst = 700;
+			final int numElements = 1000;
+			for (int i = 0; i < numElementsFirst; i++) {
+				testHarness.processElement(new StreamRecord<>(i));
+			}
+
+			// draw a snapshot and dispose the window
+			int beforeSnapShot = testHarness.getOutput().size();
+			StreamStateHandle state = testHarness.snapshotLegacy(1L, System.currentTimeMillis());
+			List<Integer> resultAtSnapshot = extractFromStreamRecords(testHarness.getOutput());
+			int afterSnapShot = testHarness.getOutput().size();
+			assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
+			assertTrue(afterSnapShot <= numElementsFirst);
+
+			// inject some random elements, which should not show up in the state
+			for (int i = 0; i < 300; i++) {
+				testHarness.processElement(new StreamRecord<>(i + numElementsFirst));
+			}
+
+			testHarness.close();
+			op.dispose();
+
+			// re-create the operator and restore the state
+			op = new AccumulatingProcessingTimeWindowOperator<>(
+				validatingIdentityProcessFunction, identitySelector,
+				IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+				windowSize, windowSize);
+
+			testHarness = new OneInputStreamOperatorTestHarness<>(op);
+
+			testHarness.setup();
+			testHarness.restore(state);
+			testHarness.open();
+
+			// inject some more elements
+			for (int i = numElementsFirst; i < numElements; i++) {
+				testHarness.processElement(new StreamRecord<>(i));
+			}
+
+			testHarness.setProcessingTime(400);
+
+			// get and verify the result
+			List<Integer> finalResult = new ArrayList<>();
+			finalResult.addAll(resultAtSnapshot);
+			List<Integer> finalPartialResult = extractFromStreamRecords(testHarness.getOutput());
+			finalResult.addAll(finalPartialResult);
+			assertEquals(numElements, finalResult.size());
+
+			Collections.sort(finalResult);
+			for (int i = 0; i < numElements; i++) {
+				assertEquals(i, finalResult.get(i).intValue());
+			}
+			testHarness.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void checkpointRestoreWithPendingWindowTumbling() {
 		try {
 			final int windowSize = 200;
@@ -501,6 +782,98 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@Test
+	public void checkpointRestoreWithPendingWindowSlidingWithProcessFunction() {
+		try {
+			final int factor = 4;
+			final int windowSlide = 50;
+			final int windowSize = factor * windowSlide;
+
+			// sliding window (200 msecs) every 50 msecs
+			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+				new AccumulatingProcessingTimeWindowOperator<>(
+					validatingIdentityProcessFunction, identitySelector,
+					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+					windowSize, windowSlide);
+
+			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+				new OneInputStreamOperatorTestHarness<>(op);
+
+			testHarness.setProcessingTime(0);
+
+			testHarness.setup();
+			testHarness.open();
+
+			// inject some elements
+			final int numElements = 1000;
+			final int numElementsFirst = 700;
+
+			for (int i = 0; i < numElementsFirst; i++) {
+				testHarness.processElement(new StreamRecord<>(i));
+			}
+
+			// draw a snapshot
+			List<Integer> resultAtSnapshot = extractFromStreamRecords(testHarness.getOutput());
+			int beforeSnapShot = testHarness.getOutput().size();
+			StreamStateHandle state = testHarness.snapshotLegacy(1L, System.currentTimeMillis());
+			int afterSnapShot = testHarness.getOutput().size();
+			assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
+
+			assertTrue(resultAtSnapshot.size() <= factor * numElementsFirst);
+
+			// inject the remaining elements - these should not influence the snapshot
+			for (int i = numElementsFirst; i < numElements; i++) {
+				testHarness.processElement(new StreamRecord<>(i));
+			}
+
+			testHarness.close();
+
+			// re-create the operator and restore the state
+			op = new AccumulatingProcessingTimeWindowOperator<>(
+				validatingIdentityProcessFunction, identitySelector,
+				IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+				windowSize, windowSlide);
+
+			testHarness = new OneInputStreamOperatorTestHarness<>(op);
+
+			testHarness.setup();
+			testHarness.restore(state);
+			testHarness.open();
+
+
+			// inject again the remaining elements
+			for (int i = numElementsFirst; i < numElements; i++) {
+				testHarness.processElement(new StreamRecord<>(i));
+			}
+
+			testHarness.setProcessingTime(50);
+			testHarness.setProcessingTime(100);
+			testHarness.setProcessingTime(150);
+			testHarness.setProcessingTime(200);
+			testHarness.setProcessingTime(250);
+			testHarness.setProcessingTime(300);
+			testHarness.setProcessingTime(350);
+
+			// get and verify the result
+			List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
+			List<Integer> finalPartialResult = extractFromStreamRecords(testHarness.getOutput());
+			finalResult.addAll(finalPartialResult);
+			assertEquals(factor * numElements, finalResult.size());
+
+			Collections.sort(finalResult);
+			for (int i = 0; i < factor * numElements; i++) {
+				assertEquals(i / factor, finalResult.get(i).intValue());
+			}
+
+			testHarness.close();
+			op.dispose();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void checkpointRestoreWithPendingWindowSliding() {
 		try {
 			final int factor = 4;
@@ -601,8 +974,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
 					new AccumulatingProcessingTimeWindowOperator<>(
-							new StatefulFunction(), identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
+							new InternalIterableProcessWindowFunction<>(new StatefulFunction()),
+							identitySelector,
+							IntSerializer.INSTANCE,
+							IntSerializer.INSTANCE,
+							50,
+							50);
 
 			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
 					new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
@@ -661,7 +1038,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 
-	private static class StatefulFunction extends RichWindowFunction<Integer, Integer, Integer, TimeWindow> {
+	private static class StatefulFunction extends RichProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
 
 		// we use a concurrent map here even though there is no concurrency, to
 		// get "volatile" style access to entries
@@ -677,8 +1054,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 
 		@Override
-		public void apply(Integer key,
-						  TimeWindow window,
+		public void process(Integer key,
+						  Context context,
 						  Iterable<Integer> values,
 						  Collector<Integer> out) throws Exception {
 			for (Integer i : values) {