You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:40 UTC

[13/24] flink git commit: [FLINK-2550] [streaming] Make fast-path processing time windows fault tolerant

[FLINK-2550] [streaming] Make fast-path processing time windows fault tolerant


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c24dca50
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c24dca50
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c24dca50

Branch: refs/heads/master
Commit: c24dca501e1b0e1dcdc38d2e81e0a182bc2ae6bb
Parents: 479bec0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 8 21:38:39 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:10 2015 +0200

----------------------------------------------------------------------
 .../core/memory/DataInputViewStreamWrapper.java |  38 +++
 .../memory/DataOutputViewStreamWrapper.java     |  59 ++++
 .../OutputViewDataOutputStreamWrapper.java      |   5 +-
 .../OutputViewObjectOutputStreamWrapper.java    |   7 +-
 .../streaming/api/datastream/KeyedStream.java   |  31 +-
 .../api/datastream/WindowedStream.java          |  21 +-
 .../flink/streaming/api/graph/StreamConfig.java |   2 +-
 .../flink/streaming/api/graph/StreamGraph.java  |   4 +-
 .../api/operators/AbstractStreamOperator.java   |  12 +-
 .../flink/streaming/api/state/StateBackend.java |  81 ++++-
 .../api/state/filesystem/FsStateBackend.java    |   3 +
 .../api/state/memory/MemoryStateBackend.java    |   3 +
 ...ractAlignedProcessingTimeWindowOperator.java | 126 +++++++-
 .../windowing/AbstractKeyedTimePanes.java       |  82 ++++-
 .../windowing/AccumulatingKeyedTimePanes.java   |   6 +-
 ...ccumulatingProcessingTimeWindowOperator.java | 117 ++++++-
 .../windowing/AggregatingKeyedTimePanes.java    |   6 +-
 ...AggregatingProcessingTimeWindowOperator.java |   7 +-
 .../streaming/runtime/tasks/StreamTask.java     |  25 +-
 .../streaming/api/AggregationFunctionTest.java  |  95 ++++--
 .../streaming/api/graph/SlotAllocationTest.java |  21 +-
 .../api/operators/StreamGroupedFoldTest.java    |  30 +-
 .../api/operators/StreamGroupedReduceTest.java  |   2 +
 ...AlignedProcessingTimeWindowOperatorTest.java | 313 ++++++++++++++++--
 ...AlignedProcessingTimeWindowOperatorTest.java | 321 +++++++++++++++++--
 .../flink/streaming/util/MockContext.java       |  34 +-
 .../util/OneInputStreamOperatorTestHarness.java |  25 +-
 27 files changed, 1271 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
new file mode 100644
index 0000000..80affea
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class DataInputViewStreamWrapper extends DataInputStream implements DataInputView {
+
+	public DataInputViewStreamWrapper(InputStream in) {
+		super(in);
+	}
+	
+	@Override
+	public void skipBytesToRead(int numBytes) throws IOException {
+		if (skipBytes(numBytes) != numBytes){
+			throw new EOFException("Could not skip " + numBytes + " bytes.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
new file mode 100644
index 0000000..efcc17e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class DataOutputViewStreamWrapper extends DataOutputStream implements DataOutputView {
+
+	private byte[] tempBuffer;
+	
+	public DataOutputViewStreamWrapper(OutputStream out) {
+		super(out);
+	}
+
+	@Override
+	public void skipBytesToWrite(int numBytes) throws IOException {
+		if (tempBuffer == null) {
+			tempBuffer = new byte[4096];
+		}
+
+		while (numBytes > 0) {
+			int toWrite = Math.min(numBytes, tempBuffer.length);
+			write(tempBuffer, 0, toWrite);
+			numBytes -= toWrite;
+		}
+	}
+
+	@Override
+	public void write(DataInputView source, int numBytes) throws IOException {
+		if (tempBuffer == null) {
+			tempBuffer = new byte[4096];
+		}
+		
+		while (numBytes > 0) {
+			int toCopy = Math.min(numBytes, tempBuffer.length);
+			source.read(tempBuffer, 0, toCopy);
+			write(tempBuffer, 0, toCopy);
+			numBytes -= toCopy;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
index ffe36c0..3be5d8b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
@@ -21,7 +21,6 @@ package org.apache.flink.core.memory;
 import java.io.Closeable;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Arrays;
 
 public class OutputViewDataOutputStreamWrapper implements DataOutputView, Closeable {
 	
@@ -43,9 +42,7 @@ public class OutputViewDataOutputStreamWrapper implements DataOutputView, Closea
 
 	@Override
 	public void skipBytesToWrite(int numBytes) throws IOException {
-		byte[] bytes = new byte[numBytes];
-		Arrays.fill(bytes, (byte)0);
-		out.write(bytes);
+		out.write(new byte[numBytes]);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java
index b84e07e..49cc3a7 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java
@@ -20,9 +20,9 @@ package org.apache.flink.core.memory;
 
 import java.io.IOException;
 import java.io.ObjectOutputStream;
-import java.util.Arrays;
 
 public class OutputViewObjectOutputStreamWrapper implements DataOutputView {
+	
 	private final ObjectOutputStream out;
 
 	public OutputViewObjectOutputStreamWrapper(ObjectOutputStream out){
@@ -31,10 +31,7 @@ public class OutputViewObjectOutputStreamWrapper implements DataOutputView {
 
 	@Override
 	public void skipBytesToWrite(int numBytes) throws IOException {
-		byte[] buffer = new byte[numBytes];
-		Arrays.fill(buffer, (byte) 0);
-
-		out.write(buffer);
+		out.write(new byte[numBytes]);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index cdea910..d4a3a77 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -60,10 +60,12 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
  * @param <KEY> The type of the key in the Keyed Stream.
  */
 public class KeyedStream<T, KEY> extends DataStream<T> {
-	
-	protected final KeySelector<T, KEY> keySelector;
 
-	protected final TypeInformation<KEY> keyType;
+	/** The key selector that can get the key by which the stream if partitioned from the elements */
+	private final KeySelector<T, KEY> keySelector;
+
+	/** The type of the key by which the stream is partitioned */
+	private final TypeInformation<KEY> keyType;
 	
 	/**
 	 * Creates a new {@link KeyedStream} using the given {@link KeySelector}
@@ -93,18 +95,35 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 		this.keySelector = keySelector;
 		this.keyType = keyType;
 	}
-
 	
+	// ------------------------------------------------------------------------
+	//  properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the key selector that can get the key by which the stream if partitioned from the elements.
+	 * @return The key selector for the key.
+	 */
 	public KeySelector<T, KEY> getKeySelector() {
 		return this.keySelector;
 	}
 
-	
+	/**
+	 * Gets the type of the key by which the stream is partitioned. 
+	 * @return The type of the key by which the stream is partitioned.
+	 */
+	public TypeInformation<KEY> getKeyType() {
+		return keyType;
+	}
+
 	@Override
 	protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
 		throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
 	}
 
+	// ------------------------------------------------------------------------
+	//  basic transformations
+	// ------------------------------------------------------------------------
 	
 	@Override
 	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
@@ -119,8 +138,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 		
 		return returnStream;
 	}
-
-	
 	
 	@Override
 	public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 1b511d8..f1220de 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -503,7 +503,10 @@ public class WindowedStream<T, K, W extends Window> {
 				@SuppressWarnings("unchecked")
 				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
 						new AggregatingProcessingTimeWindowOperator<>(
-								reducer, input.getKeySelector(), windowLength, windowSlide);
+								reducer, input.getKeySelector(), 
+								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+								windowLength, windowSlide);
 				return input.transform(opName, resultType, op);
 			}
 			else if (function instanceof WindowFunction) {
@@ -511,7 +514,10 @@ public class WindowedStream<T, K, W extends Window> {
 				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
 
 				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(), windowLength, windowSlide);
+						wf, input.getKeySelector(),
+						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+						windowLength, windowSlide);
 				return input.transform(opName, resultType, op);
 			}
 		} else if (windowAssigner instanceof TumblingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
@@ -528,7 +534,11 @@ public class WindowedStream<T, K, W extends Window> {
 				@SuppressWarnings("unchecked")
 				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
 						new AggregatingProcessingTimeWindowOperator<>(
-								reducer, input.getKeySelector(), windowLength, windowSlide);
+								reducer,
+								input.getKeySelector(),
+								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+								windowLength, windowSlide);
 				return input.transform(opName, resultType, op);
 			}
 			else if (function instanceof WindowFunction) {
@@ -536,7 +546,10 @@ public class WindowedStream<T, K, W extends Window> {
 				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
 
 				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(), windowLength, windowSlide);
+						wf, input.getKeySelector(),
+						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+						windowLength, windowSlide);
 				return input.transform(opName, resultType, op);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 55afc93..76be598 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -386,7 +386,7 @@ public class StreamConfig implements Serializable {
 		}
 	}
 	
-	public void setStatePartitioner(KeySelector<?, Serializable> partitioner) {
+	public void setStatePartitioner(KeySelector<?, ?> partitioner) {
 		try {
 			InstantiationUtil.writeObjectToConfig(partitioner, this.config, STATE_PARTITIONER);
 		} catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 4c5c19c..0652406 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -64,8 +64,6 @@ import org.apache.sling.commons.json.JSONException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.util.Objects.requireNonNull;
-
 /**
  * Class representing the streaming topology. It contains all the information
  * necessary to build the jobgraph for the execution.
@@ -149,7 +147,7 @@ public class StreamGraph extends StreamingPlan {
 	}
 
 	public void setStateBackend(StateBackend<?> backend) {
-		this.stateBackend = requireNonNull(backend);
+		this.stateBackend = backend;
 	}
 
 	public StateBackend<?> getStateBackend() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index e99d54d..ca86627 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -144,13 +144,9 @@ public abstract class AbstractStreamOperator<OUT>
 
 	@Override
 	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
-		// here, we deal with operator checkpoints and key/value state snapshots
+		// here, we deal with key/value state snapshots
 		
 		StreamTaskState state = new StreamTaskState();
-		
-		// (1) checkpoint the operator, if the operator is stateful
-		
-		// (2) draw a snapshot of the key/value state 
 		if (keyValueState != null) {
 			KvStateSnapshot<?, ?, ?> snapshot = keyValueState.shapshot(checkpointId, timestamp);
 			state.setKvState(snapshot);
@@ -161,10 +157,8 @@ public abstract class AbstractStreamOperator<OUT>
 	
 	@Override
 	public void restoreState(StreamTaskState state) throws Exception {
-		// (1) checkpoint the operator, if the operator is stateful
-		
-		// (2) restore the key/value state. the actual restore happens lazily, when the function requests
-		//     the state again, because the restore method needs information provided by the user function
+		// restore the key/value state. the actual restore happens lazily, when the function requests
+		// the state again, because the restore method needs information provided by the user function
 		keyValueStateSnapshot = state.getKvState();
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
index b4fce7e..f4391ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
@@ -20,6 +20,10 @@ package org.apache.flink.streaming.api.state;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.StateHandle;
 
 import java.io.IOException;
@@ -57,6 +61,14 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
 	 * @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
 	 */
 	public abstract void disposeAllStateForCurrentJob() throws Exception;
+
+	/**
+	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
+	 * checkpoint data.
+	 * 
+	 * @throws Exception Exceptions can be forwarded and will be logged by the system
+	 */
+	public abstract void close() throws Exception;
 	
 	// ------------------------------------------------------------------------
 	//  key/value state
@@ -96,7 +108,21 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
 	 */
 	public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
 			long checkpointID, long timestamp) throws Exception;
-
+	
+	/**
+	 * Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
+	 * When the stream is closes, it returns a state handle that can retrieve the state back.
+	 *
+	 * @param checkpointID The ID of the checkpoint.
+	 * @param timestamp The timestamp of the checkpoint.
+	 * @return An DataOutputView stream that writes state for the given checkpoint.
+	 *
+	 * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
+	 */
+	public CheckpointStateOutputView createCheckpointStateOutputView(
+			long checkpointID, long timestamp) throws Exception {
+		return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));
+	}
 
 	/**
 	 * Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
@@ -132,4 +158,57 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
 		 */
 		public abstract StreamStateHandle closeAndGetHandle() throws IOException;
 	}
+
+	/**
+	 * A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
+	 */
+	public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {
+		
+		private final CheckpointStateOutputStream out;
+		
+		public CheckpointStateOutputView(CheckpointStateOutputStream out) {
+			super(out);
+			this.out = out;
+		}
+
+		/**
+		 * Closes the stream and gets a state handle that can create a DataInputView.
+		 * producing the data written to this stream.
+		 *
+		 * @return A state handle that can create an input stream producing the data written to this stream.
+		 * @throws IOException Thrown, if the stream cannot be closed.
+		 */
+		public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
+			return new DataInputViewHandle(out.closeAndGetHandle());
+		}
+
+		@Override
+		public void close() throws IOException {
+			out.close();
+		}
+	}
+
+	/**
+	 * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
+	 */
+	private static final class DataInputViewHandle implements StateHandle<DataInputView> {
+
+		private static final long serialVersionUID = 2891559813513532079L;
+		
+		private final StreamStateHandle stream;
+
+		private DataInputViewHandle(StreamStateHandle stream) {
+			this.stream = stream;
+		}
+
+		@Override
+		public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
+			return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader)); 
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			stream.discardState();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
index 1fc2457..3cbd227 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
@@ -231,6 +231,9 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 		}
 	}
 
+	@Override
+	public void close() throws Exception {}
+
 	// ------------------------------------------------------------------------
 	//  state backend operations
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
index b2dfae8..05368bd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
@@ -75,6 +75,9 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
 		// nothing to do here, GC will do it
 	}
 
+	@Override
+	public void close() throws Exception {}
+
 	// ------------------------------------------------------------------------
 	//  State backend operations
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 227de49..cf8575e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -21,18 +21,24 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 import org.apache.commons.math3.util.ArithmeticUtils;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.state.StateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
+import static java.util.Objects.requireNonNull;
 
-public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, F extends Function> 
+public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE, F extends Function> 
 		extends AbstractUdfStreamOperator<OUT, F> 
 		implements OneInputStreamOperator<IN, OUT>, Triggerable {
 	
@@ -45,6 +51,9 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	private final Function function;
 	private final KeySelector<IN, KEY> keySelector;
 	
+	private final TypeSerializer<KEY> keySerializer;
+	private final TypeSerializer<STATE> stateTypeSerializer;
+	
 	private final long windowSize;
 	private final long windowSlide;
 	private final long paneSize;
@@ -52,24 +61,25 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	
 	// ----- fields for operator functionality -----
 	
-	private transient AbstractKeyedTimePanes<IN, KEY, ?, OUT> panes;
+	private transient AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
 	
 	private transient TimestampedCollector<OUT> out;
 	
+	private transient RestoredState<IN, KEY, STATE, OUT> restoredState;
+	
 	private transient long nextEvaluationTime;
 	private transient long nextSlideTime;
 	
 	protected AbstractAlignedProcessingTimeWindowOperator(
 			F function,
 			KeySelector<IN, KEY> keySelector,
+			TypeSerializer<KEY> keySerializer,
+			TypeSerializer<STATE> stateTypeSerializer,
 			long windowLength,
 			long windowSlide)
 	{
 		super(function);
 		
-		if (function == null || keySelector == null) {
-			throw new NullPointerException();
-		}
 		if (windowLength < MIN_SLIDE_TIME) {
 			throw new IllegalArgumentException("Window length must be at least " + MIN_SLIDE_TIME + " msecs");
 		}
@@ -87,8 +97,10 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 							"The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide));
 		}
 		
-		this.function = function;
-		this.keySelector = keySelector;
+		this.function = requireNonNull(function);
+		this.keySelector = requireNonNull(keySelector);
+		this.keySerializer = requireNonNull(keySerializer);
+		this.stateTypeSerializer = requireNonNull(stateTypeSerializer);
 		this.windowSize = windowLength;
 		this.windowSlide = windowSlide;
 		this.paneSize = paneSlide;
@@ -96,7 +108,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	}
 	
 	
-	protected abstract AbstractKeyedTimePanes<IN, KEY, ?, OUT> createPanes(
+	protected abstract AbstractKeyedTimePanes<IN, KEY, STATE, OUT> createPanes(
 			KeySelector<IN, KEY> keySelector, Function function);
 
 	// ------------------------------------------------------------------------
@@ -106,19 +118,53 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	@Override
 	public void open() throws Exception {
 		super.open();
-		
+
 		out = new TimestampedCollector<>(output);
 		
-		// create the panes that gather the elements per slide
-		panes = createPanes(keySelector, function);
-		
 		// decide when to first compute the window and when to slide it
 		// the values should align with the start of time (that is, the UNIX epoch, not the big bang)
 		final long now = System.currentTimeMillis();
 		nextEvaluationTime = now + windowSlide - (now % windowSlide);
 		nextSlideTime = now + paneSize - (now % paneSize);
+
+		final long firstTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
+		
+		// check if we restored state and if we need to fire some windows based on that restored state
+		if (restoredState == null) {
+			// initial empty state: create empty panes that gather the elements per slide
+			panes = createPanes(keySelector, function);
+		}
+		else {
+			// restored state
+			panes = restoredState.panes;
+			
+			long nextPastEvaluationTime = restoredState.nextEvaluationTime;
+			long nextPastSlideTime = restoredState.nextSlideTime;
+			long nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime);
+			int numPanesRestored = panes.getNumPanes();
+			
+			// fire windows from the past as long as there are more panes with data and as long
+			// as the missed trigger times have not caught up with the presence
+			while (numPanesRestored > 0 && nextPastTriggerTime < firstTriggerTime) {
+				// evaluate the window from the past
+				if (nextPastTriggerTime == nextPastEvaluationTime) {
+					computeWindow(nextPastTriggerTime);
+					nextPastEvaluationTime += windowSlide;
+				}
+				
+				// evaluate slide from the past
+				if (nextPastTriggerTime == nextPastSlideTime) {
+					panes.slidePanes(numPanesPerWindow);
+					numPanesRestored--;
+					nextPastSlideTime += paneSize;
+				}
+
+				nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime);
+			}
+		}
 		
-		registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this);
+		// make sure the first window happens
+		registerTimer(firstTriggerTime, this);
 	}
 
 	@Override
@@ -197,6 +243,44 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	}
 
 	// ------------------------------------------------------------------------
+	//  Checkpointing
+	// ------------------------------------------------------------------------
+
+	@Override
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+		
+		// we write the panes with the key/value maps into the stream, as well as when this state
+		// should have triggered and slided
+		StateBackend.CheckpointStateOutputView out = 
+				getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+
+		out.writeLong(nextEvaluationTime);
+		out.writeLong(nextSlideTime);
+		panes.writeToOutput(out, keySerializer, stateTypeSerializer);
+		
+		taskState.setOperatorState(out.closeAndGetHandle());
+		return taskState;
+	}
+
+	@Override
+	public void restoreState(StreamTaskState taskState) throws Exception {
+		super.restoreState(taskState);
+
+		@SuppressWarnings("unchecked")
+		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
+		DataInputView in = inputState.getState(getUserCodeClassloader());
+		
+		final long nextEvaluationTime = in.readLong();
+		final long nextSlideTime = in.readLong();
+
+		AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes = createPanes(keySelector, function);
+		panes.readFromInput(in, keySerializer, stateTypeSerializer);
+		
+		restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime);
+	}
+
+	// ------------------------------------------------------------------------
 	//  Property access (for testing)
 	// ------------------------------------------------------------------------
 
@@ -232,4 +316,20 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	public String toString() {
 		return "Window (processing time) (length=" + windowSize + ", slide=" + windowSlide + ')';
 	}
+
+	// ------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	
+	private static final class RestoredState<IN, KEY, STATE, OUT> {
+
+		final AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
+		final long nextEvaluationTime;
+		final long nextSlideTime;
+
+		RestoredState(AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes, long nextEvaluationTime, long nextSlideTime) {
+			this.panes = panes;
+			this.nextEvaluationTime = nextEvaluationTime;
+			this.nextSlideTime = nextSlideTime;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
index 07dea06..d1cea20 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
@@ -18,16 +18,29 @@
 
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.Iterator;
 
-
+/**
+ * Base class for a multiple key/value maps organized in panes.
+ */
 public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
 	
+	private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42;
+
+	private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5;
+	
+	/** The latest time pane */
 	protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
 
+	/** The previous time panes, ordered by time (early to late) */
 	protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>();
 
 	// ------------------------------------------------------------------------
@@ -43,6 +56,10 @@ public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
 		previousPanes.clear();
 	}
 	
+	public int getNumPanes() {
+		return previousPanes.size() + 1;
+	}
+	
 	
 	public void slidePanes(int panesToKeep) {
 		if (panesToKeep > 1) {
@@ -74,4 +91,67 @@ public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
 		// let the maps make a coordinated traversal and evaluate the window function per contained key
 		KeyMap.traverseMaps(panes, traversal, traversalPass);
 	}
+	
+	// ------------------------------------------------------------------------
+	//  Serialization and de-serialization
+	// ------------------------------------------------------------------------
+
+	public void writeToOutput(
+			final DataOutputView output,
+			final TypeSerializer<Key> keySerializer,
+			final TypeSerializer<Aggregate> aggSerializer) throws IOException
+	{
+		output.writeInt(BEGIN_OF_STATE_MAGIC_NUMBER);
+		
+		int numPanes = getNumPanes();
+		output.writeInt(numPanes);
+		
+		// write from the past
+		Iterator<KeyMap<Key, Aggregate>> previous = previousPanes.iterator();
+		for (int paneNum = 0; paneNum < numPanes; paneNum++) {
+			output.writeInt(BEGIN_OF_PANE_MAGIC_NUMBER);
+			KeyMap<Key, Aggregate> pane = (paneNum == numPanes - 1) ? latestPane : previous.next();
+			
+			output.writeInt(pane.size());
+			for (KeyMap.Entry<Key, Aggregate> entry : pane) {
+				keySerializer.serialize(entry.getKey(), output);
+				aggSerializer.serialize(entry.getValue(), output);
+			}
+		}
+	}
+	
+	public void readFromInput(
+			final DataInputView input,
+			final TypeSerializer<Key> keySerializer,
+			final TypeSerializer<Aggregate> aggSerializer) throws IOException
+	{
+		validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, input.readInt());
+		int numPanes = input.readInt();
+		
+		// read from the past towards the presence
+		while (numPanes > 0) {
+			validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, input.readInt());
+			KeyMap<Key, Aggregate> pane = (numPanes == 1) ? latestPane : new KeyMap<Key, Aggregate>();
+			
+			final int numElementsInPane = input.readInt();
+			for (int i = numElementsInPane - 1; i >= 0; i--) {
+				Key k = keySerializer.deserialize(input);
+				Aggregate a = aggSerializer.deserialize(input);
+				pane.put(k, a);
+			}
+			
+			if (numPanes > 1) {
+				previousPanes.addLast(pane);
+			}
+			numPanes--;
+		}
+	}
+	
+	private static void validateMagicNumber(int expected, int found) throws IOException {
+		if (expected != found) {
+			throw new IOException("Corrupt state stream - wrong magic number. " +
+				"Expected '" + Integer.toHexString(expected) +
+				"', found '" + Integer.toHexString(found) + '\'');
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index 55c1be0..c854e6c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -35,8 +35,10 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 	private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
 
 	private final WindowFunction<Type, Result, Key, Window> function;
-	
-	private long evaluationPass;
+
+	/**
+	 * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
+	private long evaluationPass = 1L;   
 
 	// ------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 3bcffbc..7a7d04c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -19,14 +19,20 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
+import java.io.IOException;
+import java.util.ArrayList;
+
 
 public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, WindowFunction<IN, OUT, KEY, TimeWindow>>  {
+		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
 
 	private static final long serialVersionUID = 7305948082830843475L;
 
@@ -34,10 +40,13 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 	public AccumulatingProcessingTimeWindowOperator(
 			WindowFunction<IN, OUT, KEY, TimeWindow> function,
 			KeySelector<IN, KEY> keySelector,
+			TypeSerializer<KEY> keySerializer,
+			TypeSerializer<IN> valueSerializer,
 			long windowLength,
 			long windowSlide)
 	{
-		super(function, keySelector, windowLength, windowSlide);
+		super(function, keySelector, keySerializer,
+				new ArrayListSerializer<IN>(valueSerializer), windowLength, windowSlide);
 	}
 
 	@Override
@@ -47,4 +56,108 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 		
 		return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
 	}
+	
+	// ------------------------------------------------------------------------
+	//  Utility Serializer for Lists of Elements
+	// ------------------------------------------------------------------------
+	
+	@SuppressWarnings("ForLoopReplaceableByForEach")
+	private static final class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
+
+		private static final long serialVersionUID = 1119562170939152304L;
+		
+		private final TypeSerializer<T> elementSerializer;
+
+		ArrayListSerializer(TypeSerializer<T> elementSerializer) {
+			this.elementSerializer = elementSerializer;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<ArrayList<T>> duplicate() {
+			TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
+			return duplicateElement == elementSerializer ? this : new ArrayListSerializer<T>(duplicateElement);
+		}
+
+		@Override
+		public ArrayList<T> createInstance() {
+			return new ArrayList<>();
+		}
+
+		@Override
+		public ArrayList<T> copy(ArrayList<T> from) {
+			ArrayList<T> newList = new ArrayList<>(from.size());
+			for (int i = 0; i < from.size(); i++) {
+				newList.add(elementSerializer.copy(from.get(i)));
+			}
+			return newList;
+		}
+
+		@Override
+		public ArrayList<T> copy(ArrayList<T> from, ArrayList<T> reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return -1; // var length
+		}
+
+		@Override
+		public void serialize(ArrayList<T> list, DataOutputView target) throws IOException {
+			final int size = list.size();
+			target.writeInt(size);
+			for (int i = 0; i < size; i++) {
+				elementSerializer.serialize(list.get(i), target);
+			}
+		}
+
+		@Override
+		public ArrayList<T> deserialize(DataInputView source) throws IOException {
+			final int size = source.readInt();
+			final ArrayList<T> list = new ArrayList<>(size);
+			for (int i = 0; i < size; i++) {
+				list.add(elementSerializer.deserialize(source));
+			}
+			return list;
+		}
+
+		@Override
+		public ArrayList<T> deserialize(ArrayList<T> reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			// copy number of elements
+			final int num = source.readInt();
+			target.writeInt(num);
+			for (int i = 0; i < num; i++) {
+				elementSerializer.copy(source, target);
+			}
+		}
+
+		// --------------------------------------------------------------------
+		
+		@Override
+		public boolean equals(Object obj) {
+			return obj == this || 
+					(obj != null && obj.getClass() == getClass() && 
+						elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return true;
+		}
+
+		@Override
+		public int hashCode() {
+			return elementSerializer.hashCode();
+		}
+	} 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
index c17f0b4..d395b2a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
@@ -29,8 +29,10 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
 	private final KeySelector<Type, Key> keySelector;
 	
 	private final ReduceFunction<Type> reducer;
-	
-	private long evaluationPass;
+
+	/**
+	 * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
+	private long evaluationPass = 1L;
 
 	// ------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
index cc38019..0e07cea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -20,10 +20,11 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 
 public class AggregatingProcessingTimeWindowOperator<KEY, IN> 
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, ReduceFunction<IN>> {
+		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, IN, ReduceFunction<IN>> {
 
 	private static final long serialVersionUID = 7305948082830843475L;
 
@@ -31,10 +32,12 @@ public class AggregatingProcessingTimeWindowOperator<KEY, IN>
 	public AggregatingProcessingTimeWindowOperator(
 			ReduceFunction<IN> function,
 			KeySelector<IN, KEY> keySelector,
+			TypeSerializer<KEY> keySerializer,
+			TypeSerializer<IN> aggregateSerializer,
 			long windowLength,
 			long windowSlide)
 	{
-		super(function, keySelector, windowLength, windowSlide);
+		super(function, keySelector, keySerializer, aggregateSerializer, windowLength, windowSlide);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index bbfd233..5bf7d8e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -188,7 +188,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		
 		boolean disposed = false;
 		try {
-			openAllOperators();
+			// we need to make sure that any triggers scheduled in open() cannot be
+			// executed before all operators are opened
+			synchronized (lock) {
+				openAllOperators();
+			}
 
 			// let the task do its work
 			isRunning = true;
@@ -202,12 +206,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			// make sure no further checkpoint and notification actions happen.
 			// we make sure that no other thread is currently in the locked scope before
 			// we close the operators by trying to acquire the checkpoint scope lock
-			synchronized (lock) {}
-			
-			// this is part of the main logic, so if this fails, the task is considered failed
-			closeAllOperators();
+			// we also need to make sure that no triggers fire concurrently with the close logic
+			synchronized (lock) {
+				// this is part of the main logic, so if this fails, the task is considered failed
+				closeAllOperators();
+			}
 			
-			// make sure all data is flushed
+			// make sure all buffered data is flushed
 			operatorChain.flushOutputs();
 
 			// make an attempt to dispose the operators such that failures in the dispose call
@@ -239,6 +244,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			if (!disposed) {
 				disposeAllOperators();
 			}
+
+			try {
+				if (stateBackend != null) {
+					stateBackend.close();
+				}
+			} catch (Throwable t) {
+				LOG.error("Error while closing the state backend", t);
+			}
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index aeb5078..dd8dec9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -24,11 +24,13 @@ import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -38,6 +40,7 @@ import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
 import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+
 import org.junit.Test;
 
 public class AggregationFunctionTest {
@@ -78,9 +81,10 @@ public class AggregationFunctionTest {
 
 		ExecutionConfig config = new ExecutionConfig();
 
-		KeySelector<Tuple2<Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+		KeySelector<Tuple2<Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
 				new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
 				typeInfo, config);
+		TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
 
 		// aggregations tested
 		ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
@@ -90,17 +94,20 @@ public class AggregationFunctionTest {
 		ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
 				1, typeInfo, AggregationType.MAX, config);
 
-		List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
+		List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecuteForKeyedStream(
 				new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
-				getInputList());
+				getInputList(),
+				keySelector, keyType);
 
-		List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
+		List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecuteForKeyedStream(
 				new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
-				getInputList());
+				getInputList(),
+				keySelector, keyType);
 
-		List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
+		List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
 				new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
-				getInputList());
+				getInputList(),
+				keySelector, keyType);
 
 		assertEquals(expectedGroupSumList, groupedSumList);
 		assertEquals(expectedGroupMinList, groupedMinList);
@@ -143,9 +150,10 @@ public class AggregationFunctionTest {
 
 		ExecutionConfig config = new ExecutionConfig();
 
-		KeySelector<MyPojo, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+		KeySelector<MyPojo, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
 				new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
 				typeInfo, config);
+		TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
 
 		// aggregations tested
 		ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
@@ -154,15 +162,20 @@ public class AggregationFunctionTest {
 		ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
 				false, config);
 
-		List<MyPojo> groupedSumList = MockContext.createAndExecute(
+		List<MyPojo> groupedSumList = MockContext.createAndExecuteForKeyedStream(
 				new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
-				getInputPojoList());
-		List<MyPojo> groupedMinList = MockContext.createAndExecute(
+				getInputPojoList(),
+				keySelector, keyType);
+		
+		List<MyPojo> groupedMinList = MockContext.createAndExecuteForKeyedStream(
 				new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
-				getInputPojoList());
-		List<MyPojo> groupedMaxList = MockContext.createAndExecute(
+				getInputPojoList(),
+				keySelector, keyType);
+
+		List<MyPojo> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
 				new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
-				getInputPojoList());
+				getInputPojoList(),
+				keySelector, keyType);
 
 		assertEquals(expectedGroupSumList, groupedSumList);
 		assertEquals(expectedGroupMinList, groupedMinList);
@@ -200,9 +213,10 @@ public class AggregationFunctionTest {
 
 		ExecutionConfig config = new ExecutionConfig();
 
-		KeySelector<Tuple3<Integer, Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+		KeySelector<Tuple3<Integer, Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
 				new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
 				typeInfo, config);
+		TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
 		
 		// aggregations tested
 		ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionFirst =
@@ -214,18 +228,25 @@ public class AggregationFunctionTest {
 		ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionLast =
 				new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, false, config);
 
-		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
+		assertEquals(maxByFirstExpected, MockContext.createAndExecuteForKeyedStream(
 				new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
-				getInputByList()));
-		assertEquals(maxByLastExpected, MockContext.createAndExecute(
+				getInputByList(),
+				keySelector, keyType));
+		
+		assertEquals(maxByLastExpected, MockContext.createAndExecuteForKeyedStream(
 				new StreamGroupedReduce<>(maxByFunctionLast, typeInfo.createSerializer(config)),
-				getInputByList()));
-		assertEquals(minByLastExpected, MockContext.createAndExecute(
+				getInputByList(),
+				keySelector, keyType));
+		
+		assertEquals(minByLastExpected, MockContext.createAndExecuteForKeyedStream(
 				new StreamGroupedReduce<>(minByFunctionLast, typeInfo.createSerializer(config)),
-				getInputByList()));
-		assertEquals(minByFirstExpected, MockContext.createAndExecute(
+				getInputByList(),
+				keySelector, keyType));
+		
+		assertEquals(minByFirstExpected, MockContext.createAndExecuteForKeyedStream(
 				new StreamGroupedReduce<>(minByFunctionFirst, typeInfo.createSerializer(config)),
-				getInputByList()));
+				getInputByList(),
+				keySelector, keyType));
 	}
 
 	@Test
@@ -258,9 +279,10 @@ public class AggregationFunctionTest {
 
 		ExecutionConfig config = new ExecutionConfig();
 
-		KeySelector<MyPojo3, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+		KeySelector<MyPojo3, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
 				new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
 				typeInfo, config);
+		TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
 
 		// aggregations tested
 		ReduceFunction<MyPojo3> maxByFunctionFirst =
@@ -272,18 +294,25 @@ public class AggregationFunctionTest {
 		ReduceFunction<MyPojo3> minByFunctionLast =
 				new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, false, config);
 
-		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
-				getInputByPojoList()));
-		assertEquals(maxByLastExpected, MockContext.createAndExecute(
+		assertEquals(maxByFirstExpected, MockContext.createAndExecuteForKeyedStream(
+						new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
+						getInputByPojoList(),
+						keySelector, keyType));
+		
+		assertEquals(maxByLastExpected, MockContext.createAndExecuteForKeyedStream(
 				new StreamGroupedReduce<>(maxByFunctionLast, typeInfo.createSerializer(config)),
-				getInputByPojoList()));
-		assertEquals(minByLastExpected, MockContext.createAndExecute(
+				getInputByPojoList(),
+				keySelector, keyType));
+		
+		assertEquals(minByLastExpected, MockContext.createAndExecuteForKeyedStream(
 				new StreamGroupedReduce<>(minByFunctionLast, typeInfo.createSerializer(config)),
-				getInputByPojoList()));
-		assertEquals(minByFirstExpected, MockContext.createAndExecute(
+				getInputByPojoList(),
+				keySelector, keyType));
+
+		assertEquals(minByFirstExpected, MockContext.createAndExecuteForKeyedStream(
 				new StreamGroupedReduce<>(minByFunctionFirst, typeInfo.createSerializer(config)),
-				getInputByPojoList()));
+				getInputByPojoList(),
+				keySelector, keyType));
 	}
 
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
index 39a13b3..8038cfb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
@@ -25,23 +25,19 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.junit.Test;
 
-public class SlotAllocationTest extends StreamingMultipleProgramsTestBase{
+import org.junit.Test;
 
-	@SuppressWarnings("serial")
+@SuppressWarnings("serial")
+public class SlotAllocationTest {
+	
 	@Test
 	public void test() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		FilterFunction<Long> dummyFilter = new FilterFunction<Long>() {
-
 			@Override
-			public boolean filter(Long value) throws Exception {
-
-				return false;
-			}
+			public boolean filter(Long value) { return false; }
 		};
 
 		env.generateSequence(1, 10).filter(dummyFilter).isolateResources().filter(dummyFilter)
@@ -53,11 +49,8 @@ public class SlotAllocationTest extends StreamingMultipleProgramsTestBase{
 		List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
 
 		assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup());
-		assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(1)
-				.getSlotSharingGroup());
-		assertNotEquals(vertices.get(2).getSlotSharingGroup(), vertices.get(3)
-				.getSlotSharingGroup());
+		assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(1).getSlotSharingGroup());
+		assertNotEquals(vertices.get(2).getSlotSharingGroup(), vertices.get(3).getSlotSharingGroup());
 		assertEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup());
-
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
index 1002b10..f6e7e6b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
@@ -23,17 +23,18 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.RichFoldFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Tests for {@link StreamGroupedFold}. These test that:
  *
@@ -48,18 +49,12 @@ public class StreamGroupedFoldTest {
 
 	private static class MyFolder implements FoldFunction<Integer, String> {
 
-		private static final long serialVersionUID = 1L;
-
 		@Override
 		public String fold(String accumulator, Integer value) throws Exception {
 			return accumulator + value.toString();
 		}
-
 	}
 
-	private TypeInformation<Integer> inType = TypeExtractor.getForClass(Integer.class);
-	private TypeInformation<String> outType = TypeExtractor.getForClass(String.class);
-
 	@Test
 	public void testGroupedFold() throws Exception {
 
@@ -72,9 +67,10 @@ public class StreamGroupedFoldTest {
 		};
 		
 		StreamGroupedFold<Integer, String, String> operator = new StreamGroupedFold<>(new MyFolder(), "100");
-		operator.setOutputType(outType, new ExecutionConfig());
+		operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
+		testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
 
 		long initialTime = 0L;
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -112,7 +108,9 @@ public class StreamGroupedFoldTest {
 		operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
-
+		testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+		
+		
 		long initialTime = 0L;
 
 		testHarness.open();
@@ -122,8 +120,8 @@ public class StreamGroupedFoldTest {
 
 		testHarness.close();
 
-		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFoldFunction.closeCalled);
-		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
+		assertTrue("RichFunction methods where not called.", TestOpenCloseFoldFunction.closeCalled);
+		assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
 	}
 
 	// This must only be used in one test, otherwise the static fields will be changed
@@ -138,7 +136,7 @@ public class StreamGroupedFoldTest {
 		public void open(Configuration parameters) throws Exception {
 			super.open(parameters);
 			if (closeCalled) {
-				Assert.fail("Close called before open.");
+				fail("Close called before open.");
 			}
 			openCalled = true;
 		}
@@ -147,7 +145,7 @@ public class StreamGroupedFoldTest {
 		public void close() throws Exception {
 			super.close();
 			if (!openCalled) {
-				Assert.fail("Open was not called before close.");
+				fail("Open was not called before close.");
 			}
 			closeCalled = true;
 		}
@@ -155,7 +153,7 @@ public class StreamGroupedFoldTest {
 		@Override
 		public String fold(String acc, Integer in) throws Exception {
 			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
+				fail("Open was not called before run.");
 			}
 			return acc + in;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
index b5d2bd6..6cb46c9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
@@ -53,6 +53,7 @@ public class StreamGroupedReduceTest {
 		StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<>(new MyReducer(), IntSerializer.INSTANCE);
 
 		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
+		testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
 
 		long initialTime = 0L;
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -84,6 +85,7 @@ public class StreamGroupedReduceTest {
 		StreamGroupedReduce<Integer> operator =
 				new StreamGroupedReduce<>(new TestOpenCloseReduceFunction(), IntSerializer.INSTANCE);
 		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
+		testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
 
 		long initialTime = 0L;