You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:32 UTC

[05/24] flink git commit: [FLINK-2808] [streaming] Refactor and extend state backend abstraction

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
new file mode 100644
index 0000000..7b8dbd5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.state.StateHandle;
+
+/**
+ * List of task states for a chain of streaming tasks.
+ */
+public class StreamTaskStateList implements StateHandle<StreamTaskState[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The states for all operator */
+	private final StreamTaskState[] states;
+
+	
+	public StreamTaskStateList(StreamTaskState[] states) {
+		this.states = states;
+	}
+
+	public boolean isEmpty() {
+		for (StreamTaskState state : states) {
+			if (state != null) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+	@Override
+	public StreamTaskState[] getState(ClassLoader userCodeClassLoader) {
+		return states;
+	}
+
+	@Override
+	public void discardState() throws Exception {
+		for (StreamTaskState state : states) {
+			if (state != null) {
+				state.discardState();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
deleted file mode 100644
index a8c4b49..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.streaming.api.state.PartitionedStreamOperatorState;
-import org.apache.flink.streaming.api.state.StreamOperatorState;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Implementation of the {@link RuntimeContext}, created by runtime stream UDF
- * operators.
- */
-public class StreamingRuntimeContext extends RuntimeUDFContext {
-
-	private final Environment env;
-	private final Map<String, StreamOperatorState<?, ?>> states;
-	private final List<PartitionedStreamOperatorState<?, ?, ?>> partitionedStates;
-	private final KeySelector<?, ?> statePartitioner;
-	private final StateHandleProvider<Serializable> provider;
-
-	/**
-	 * We need access to the {@link StreamTask} to register timer callbacks.
-	 */
-	private final StreamTask<?, ?> streamTask;
-
-	@SuppressWarnings("unchecked")
-	public StreamingRuntimeContext(
-			Environment env,
-			ExecutionConfig executionConfig,
-			KeySelector<?, ?> statePartitioner,
-			StateHandleProvider<?> provider,
-			Map<String, Accumulator<?, ?>> accumulatorMap,
-			StreamTask<?, ?> streamTask) {
-
-		super(env.getTaskName(), env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
-				env.getUserClassLoader(), executionConfig,
-				env.getDistributedCacheEntries(), accumulatorMap);
-		
-		this.env = env;
-		this.statePartitioner = statePartitioner;
-		this.states = new HashMap<>();
-		this.partitionedStates = new LinkedList<>();
-		this.provider = (StateHandleProvider<Serializable>) provider;
-		this.streamTask = streamTask;
-	}
-
-	/**
-	 * Returns the input split provider associated with the operator.
-	 * 
-	 * @return The input split provider.
-	 */
-	public InputSplitProvider getInputSplitProvider() {
-		return env.getInputSplitProvider();
-	}
-
-	/**
-	 * Returns the stub parameters associated with the {@link TaskConfig} of the
-	 * operator.
-	 * 
-	 * @return The stub parameters.
-	 */
-	public Configuration getTaskStubParameters() {
-		return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
-	}
-	
-	public StateHandleProvider<Serializable> getStateHandleProvider() {
-		return provider;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public <S, C extends Serializable> OperatorState<S> getOperatorState(String name,
-			S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException {
-		if (defaultState == null) {
-			throw new RuntimeException("Cannot set default state to null.");
-		}
-		StreamOperatorState<S, C> state = (StreamOperatorState<S, C>) getState(name, partitioned);
-		state.setCheckpointer(checkpointer);
-		state.setDefaultState(defaultState);
-
-		return (OperatorState<S>) state;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public <S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
-			boolean partitioned) throws IOException {
-		if (defaultState == null) {
-			throw new RuntimeException("Cannot set default state to null.");
-		}
-		StreamOperatorState<S, S> state = (StreamOperatorState<S, S>) getState(name, partitioned);
-		state.setDefaultState(defaultState);
-
-		return (OperatorState<S>) state;
-	}
-
-	public StreamOperatorState<?, ?> getState(String name, boolean partitioned) {
-		// Try fetching state from the map
-		StreamOperatorState<?, ?> state = states.get(name);
-		if (state == null) {
-			// If not found, create empty state and add to the map
-			state = createRawState(partitioned);
-			states.put(name, state);
-			// We keep a reference to all partitioned states for registering input
-			if (state instanceof PartitionedStreamOperatorState) {
-				partitionedStates.add((PartitionedStreamOperatorState<?, ?, ?>) state);
-			}
-		}
-		return state;
-	}
-
-	/**
-	 * Creates an empty {@link OperatorState}.
-	 * 
-	 * @return An empty operator state.
-	 */
-	@SuppressWarnings({"rawtypes", "unchecked"})
-	public StreamOperatorState<?, ?> createRawState(boolean partitioned) {
-		if (partitioned) {
-			if (statePartitioner != null) {
-				return new PartitionedStreamOperatorState(provider, statePartitioner, getUserCodeClassLoader());
-			} else {
-				throw new RuntimeException(
-						"Partitioned state can only be used with KeyedStreams.");
-			}
-		} else {
-			return new StreamOperatorState(provider);
-		}
-	}
-
-	/**
-	 * Provides access to the all the states contained in the context
-	 * 
-	 * @return All the states for the underlying operator.
-	 */
-	public Map<String, StreamOperatorState<?, ?>> getOperatorStates() {
-		return states;
-	}
-
-	/**
-	 * Register a timer callback. At the specified time the
-	 * {@code Triggerable } will be invoked. This call is guaranteed to not happen
-	 * concurrently with method calls on the operator.
-	 *
-	 * @param time The absolute time in milliseconds.
-	 * @param target The target to be triggered.
-	 */
-	public void registerTimer(long time, Triggerable target) {
-		streamTask.registerTimer(time, target);
-	}
-
-	/**
-	 * Sets the next input of the underlying operators, used to access
-	 * partitioned states.
-	 * 
-	 * @param nextRecord
-	 *            Next input of the operator.
-	 */
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public void setNextInput(StreamRecord<?> nextRecord) {
-		if (statePartitioner != null) {
-			for (PartitionedStreamOperatorState state : partitionedStates) {
-				state.setCurrentInput(nextRecord.getValue());
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 25f1a76..d2d8a2e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
@@ -35,6 +36,9 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 
 	@Override
 	public void init() throws Exception {
+		StreamConfig configuration = getConfiguration();
+		ClassLoader userClassLoader = getUserCodeClassLoader();
+		
 		TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
 		TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
 	
@@ -75,10 +79,13 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 
 	@Override
 	protected void run() throws Exception {
-		while (running && inputProcessor.processInput(streamOperator, lock)) {
-			if (timerException != null) {
-				throw timerException;
-			}
+		// cache some references on the stack, to make the code more JIT friendly
+		final TwoInputStreamOperator<IN1, IN2, OUT> operator = this.headOperator;
+		final StreamTwoInputProcessor<IN1, IN2> inputProcessor = this.inputProcessor;
+		final Object lock = getCheckpointLock();
+		
+		while (running && inputProcessor.processInput(operator, lock)) {
+			checkTimerException();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index e002780..aeb5078 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -74,8 +74,7 @@ public class AggregationFunctionTest {
 		}
 
 		// some necessary boiler plate
-		TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor
-				.getForObject(new Tuple2<>(0, 0));
+		TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor.getForObject(new Tuple2<>(0, 0));
 
 		ExecutionConfig config = new ExecutionConfig();
 
@@ -92,15 +91,15 @@ public class AggregationFunctionTest {
 				1, typeInfo, AggregationType.MAX, config);
 
 		List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
-				new StreamGroupedReduce<>(sumFunction, keySelector, typeInfo),
+				new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
 				getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
-				new StreamGroupedReduce<>(minFunction, keySelector, typeInfo),
+				new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
 				getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
-				new StreamGroupedReduce<>(maxFunction, keySelector, typeInfo),
+				new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
 				getInputList());
 
 		assertEquals(expectedGroupSumList, groupedSumList);
@@ -156,13 +155,13 @@ public class AggregationFunctionTest {
 				false, config);
 
 		List<MyPojo> groupedSumList = MockContext.createAndExecute(
-				new StreamGroupedReduce<>(sumFunction, keySelector, typeInfo),
+				new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
 				getInputPojoList());
 		List<MyPojo> groupedMinList = MockContext.createAndExecute(
-				new StreamGroupedReduce<>(minFunction, keySelector, typeInfo),
+				new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
 				getInputPojoList());
 		List<MyPojo> groupedMaxList = MockContext.createAndExecute(
-				new StreamGroupedReduce<>(maxFunction, keySelector, typeInfo),
+				new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
 				getInputPojoList());
 
 		assertEquals(expectedGroupSumList, groupedSumList);
@@ -216,16 +215,16 @@ public class AggregationFunctionTest {
 				new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, false, config);
 
 		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(maxByFunctionFirst, keySelector, typeInfo),
+				new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
 				getInputByList()));
 		assertEquals(maxByLastExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(maxByFunctionLast, keySelector, typeInfo),
+				new StreamGroupedReduce<>(maxByFunctionLast, typeInfo.createSerializer(config)),
 				getInputByList()));
 		assertEquals(minByLastExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(minByFunctionLast, keySelector, typeInfo),
+				new StreamGroupedReduce<>(minByFunctionLast, typeInfo.createSerializer(config)),
 				getInputByList()));
 		assertEquals(minByFirstExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(minByFunctionFirst, keySelector, typeInfo),
+				new StreamGroupedReduce<>(minByFunctionFirst, typeInfo.createSerializer(config)),
 				getInputByList()));
 	}
 
@@ -274,16 +273,16 @@ public class AggregationFunctionTest {
 				new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, false, config);
 
 		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(maxByFunctionFirst, keySelector, typeInfo),
+				new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
 				getInputByPojoList()));
 		assertEquals(maxByLastExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(maxByFunctionLast, keySelector, typeInfo),
+				new StreamGroupedReduce<>(maxByFunctionLast, typeInfo.createSerializer(config)),
 				getInputByPojoList()));
 		assertEquals(minByLastExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(minByFunctionLast, keySelector, typeInfo),
+				new StreamGroupedReduce<>(minByFunctionLast, typeInfo.createSerializer(config)),
 				getInputByPojoList()));
 		assertEquals(minByFirstExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(minByFunctionFirst, keySelector, typeInfo),
+				new StreamGroupedReduce<>(minByFunctionFirst, typeInfo.createSerializer(config)),
 				getInputByPojoList()));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 3a224e4..c23a4f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -538,7 +539,9 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 
 		DataStreamSink<Long> sink2 = env.generateSequence(1, 100).keyBy(key1).print();
 
-		assertTrue(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner() != null);
+		assertNotNull(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner());
+		assertNotNull(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStateKeySerializer());
+		assertNotNull(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStateKeySerializer());
 		assertEquals(key1, env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner());
 		assertTrue(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof HashPartitioner);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
index ac23cda..f7c6e53 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 import org.junit.After;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 2246ffd..c316604 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -35,7 +35,8 @@ import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.util.EvenOddOutputSelector;
 import org.apache.flink.streaming.util.NoOpIntMap;
 import org.apache.flink.streaming.util.NoOpSink;
@@ -262,19 +263,13 @@ public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase
 		}
 
 		@Override
-		public void processWatermark1(Watermark mark) throws Exception {
-
-		}
+		public void processWatermark1(Watermark mark) throws Exception {}
 
 		@Override
-		public void processWatermark2(Watermark mark) throws Exception {
-
-		}
+		public void processWatermark2(Watermark mark) throws Exception {}
 
 		@Override
-		public void setup(Output output, StreamingRuntimeContext runtimeContext) {
-
-		}
+		public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Integer>> output) {}
 	}
 
 	private static class OutputTypeConfigurableOperationWithOneInput

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
index bc5d614..1002b10 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
@@ -43,6 +43,7 @@ import org.junit.Test;
  *     <li>Watermarks are correctly forwarded</li>
  * </ul>
  */
+@SuppressWarnings("serial")
 public class StreamGroupedFoldTest {
 
 	private static class MyFolder implements FoldFunction<Integer, String> {
@@ -60,20 +61,17 @@ public class StreamGroupedFoldTest {
 	private TypeInformation<String> outType = TypeExtractor.getForClass(String.class);
 
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testGroupedFold() throws Exception {
 
-		StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<>(
-				new MyFolder(), new KeySelector<Integer, String>() {
-
-			private static final long serialVersionUID = 1L;
-
+		KeySelector<Integer, String> keySelector = new KeySelector<Integer, String>() {
+			
 			@Override
-			public String getKey(Integer value) throws Exception {
+			public String getKey(Integer value) {
 				return value.toString();
 			}
-		}, "100", inType);
-
+		};
+		
+		StreamGroupedFold<Integer, String, String> operator = new StreamGroupedFold<>(new MyFolder(), "100");
 		operator.setOutputType(outType, new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
@@ -102,15 +100,15 @@ public class StreamGroupedFoldTest {
 
 	@Test
 	public void testOpenClose() throws Exception {
-		StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<>(new TestOpenCloseFoldFunction(), new KeySelector<Integer, Integer>() {
-			private static final long serialVersionUID = 1L;
-
+		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
 			@Override
-			public Integer getKey(Integer value) throws Exception {
+			public Integer getKey(Integer value) {
 				return value;
 			}
-		}, "init", inType);
-
+		};
+		
+		StreamGroupedFold<Integer, String, Integer> operator = new StreamGroupedFold<>(
+				new TestOpenCloseFoldFunction(), "init");
 		operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
index 85d9bc1..b5d2bd6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
@@ -17,14 +17,14 @@
 
 package org.apache.flink.streaming.api.operators;
 
-
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -46,9 +46,11 @@ import org.junit.Test;
 public class StreamGroupedReduceTest {
 
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testGroupedReduce() throws Exception {
-		StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<>(new MyReducer(), new IntegerKeySelector(), typeInfo);
+
+		KeySelector<Integer, Integer> keySelector = new IntegerKeySelector();
+		
+		StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<>(new MyReducer(), IntSerializer.INSTANCE);
 
 		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
 
@@ -76,8 +78,11 @@ public class StreamGroupedReduceTest {
 
 	@Test
 	public void testOpenClose() throws Exception {
+
+		KeySelector<Integer, Integer> keySelector = new IntegerKeySelector();
+		
 		StreamGroupedReduce<Integer> operator =
-				new StreamGroupedReduce<>(new TestOpenCloseReduceFunction(), new IntegerKeySelector(), typeInfo);
+				new StreamGroupedReduce<>(new TestOpenCloseReduceFunction(), IntSerializer.INSTANCE);
 		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
 
 		long initialTime = 0L;
@@ -150,5 +155,5 @@ public class StreamGroupedReduceTest {
 		}
 	}
 
-	private static TypeInformation<Integer> typeInfo = TypeExtractor.getForClass(Integer.class);
+	private static TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
new file mode 100644
index 0000000..73100d1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle;
+import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.OperatingSystem;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+public class FileStateBackendTest {
+	
+	@Test
+	public void testSetupAndSerialization() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			final String backendDir = localFileUri(tempDir);
+			FsStateBackend originalBackend = new FsStateBackend(backendDir);
+			
+			assertFalse(originalBackend.isInitialized());
+			assertEquals(new URI(backendDir), originalBackend.getBasePath().toUri());
+			assertNull(originalBackend.getCheckpointDirectory());
+			
+			// serialize / copy the backend
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend);
+			assertFalse(backend.isInitialized());
+			assertEquals(new URI(backendDir), backend.getBasePath().toUri());
+			assertNull(backend.getCheckpointDirectory());
+			
+			// no file operations should be possible right now
+			try {
+				backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis());
+				fail("should fail with an exception");
+			} catch (IllegalStateException e) {
+				// supreme!
+			}
+			
+			backend.initializeForJob(new JobID());
+			assertNotNull(backend.getCheckpointDirectory());
+			
+			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+			assertTrue(checkpointDir.exists());
+			assertTrue(isDirectoryEmpty(checkpointDir));
+			
+			backend.disposeAllStateForCurrentJob();
+			assertNull(backend.getCheckpointDirectory());
+			
+			assertTrue(isDirectoryEmpty(tempDir));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			deleteDirectorySilently(tempDir);
+		}
+	}
+	
+	@Test
+	public void testSerializableState() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+			backend.initializeForJob(new JobID());
+
+			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+			String state1 = "dummy state";
+			String state2 = "row row row your boat";
+			Integer state3 = 42;
+			
+			StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis());
+			StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis());
+			StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis());
+
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			assertEquals(state1, handle1.getState(getClass().getClassLoader()));
+			handle1.discardState();
+			
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			assertEquals(state2, handle2.getState(getClass().getClassLoader()));
+			handle2.discardState();
+			
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			assertEquals(state3, handle3.getState(getClass().getClassLoader()));
+			handle3.discardState();
+			
+			assertTrue(isDirectoryEmpty(checkpointDir));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			deleteDirectorySilently(tempDir);
+		}
+	}
+
+	@Test
+	public void testStateOutputStream() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+			backend.initializeForJob(new JobID());
+
+			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+			byte[] state1 = new byte[1274673];
+			byte[] state2 = new byte[1];
+			byte[] state3 = new byte[0];
+			byte[] state4 = new byte[177];
+			
+			Random rnd = new Random();
+			rnd.nextBytes(state1);
+			rnd.nextBytes(state2);
+			rnd.nextBytes(state3);
+			rnd.nextBytes(state4);
+
+			long checkpointId = 97231523452L;
+
+			FsStateBackend.FsCheckpointStateOutputStream stream1 = 
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			FsStateBackend.FsCheckpointStateOutputStream stream2 =
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			FsStateBackend.FsCheckpointStateOutputStream stream3 =
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			
+			stream1.write(state1);
+			stream2.write(state2);
+			stream3.write(state3);
+			
+			FileStreamStateHandle handle1 = stream1.closeAndGetHandle();
+			FileStreamStateHandle handle2 = stream2.closeAndGetHandle();
+			FileStreamStateHandle handle3 = stream3.closeAndGetHandle();
+			
+			// use with try-with-resources
+			StreamStateHandle handle4;
+			try (StateBackend.CheckpointStateOutputStream stream4 =
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) {
+				stream4.write(state4);
+				handle4 = stream4.closeAndGetHandle();
+			}
+			
+			// close before accessing handle
+			StateBackend.CheckpointStateOutputStream stream5 =
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			stream5.write(state4);
+			stream5.close();
+			try {
+				stream5.closeAndGetHandle();
+				fail();
+			} catch (IOException e) {
+				// uh-huh
+			}
+			
+			validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
+			handle1.discardState();
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			ensureLocalFileDeleted(handle1.getFilePath());
+			
+			validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
+			handle2.discardState();
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			ensureLocalFileDeleted(handle2.getFilePath());
+			
+			validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
+			handle3.discardState();
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			ensureLocalFileDeleted(handle3.getFilePath());
+			
+			validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
+			handle4.discardState();
+			assertTrue(isDirectoryEmpty(checkpointDir));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			deleteDirectorySilently(tempDir);
+		}
+	}
+
+	@Test
+	public void testKeyValueState() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+			backend.initializeForJob(new JobID());
+
+			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+			KvState<Integer, String, FsStateBackend> kv =
+					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+			assertEquals(0, kv.size());
+
+			// some modifications to the state
+			kv.setCurrentKey(1);
+			assertNull(kv.value());
+			kv.update("1");
+			assertEquals(1, kv.size());
+			kv.setCurrentKey(2);
+			assertNull(kv.value());
+			kv.update("2");
+			assertEquals(2, kv.size());
+			kv.setCurrentKey(1);
+			assertEquals("1", kv.value());
+			assertEquals(2, kv.size());
+
+			// draw a snapshot
+			KvStateSnapshot<Integer, String, FsStateBackend> snapshot1 =
+					kv.shapshot(682375462378L, System.currentTimeMillis());
+
+			// make some more modifications
+			kv.setCurrentKey(1);
+			kv.update("u1");
+			kv.setCurrentKey(2);
+			kv.update("u2");
+			kv.setCurrentKey(3);
+			kv.update("u3");
+
+			// draw another snapshot
+			KvStateSnapshot<Integer, String, FsStateBackend> snapshot2 =
+					kv.shapshot(682375462379L, System.currentTimeMillis());
+
+			// validate the original state
+			assertEquals(3, kv.size());
+			kv.setCurrentKey(1);
+			assertEquals("u1", kv.value());
+			kv.setCurrentKey(2);
+			assertEquals("u2", kv.value());
+			kv.setCurrentKey(3);
+			assertEquals("u3", kv.value());
+
+			// restore the first snapshot and validate it
+			KvState<Integer, String, FsStateBackend> restored1 = snapshot1.restoreState(backend,
+					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+			assertEquals(2, restored1.size());
+			restored1.setCurrentKey(1);
+			assertEquals("1", restored1.value());
+			restored1.setCurrentKey(2);
+			assertEquals("2", restored1.value());
+
+			// restore the first snapshot and validate it
+			KvState<Integer, String, FsStateBackend> restored2 = snapshot2.restoreState(backend,
+					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+			assertEquals(3, restored2.size());
+			restored2.setCurrentKey(1);
+			assertEquals("u1", restored2.value());
+			restored2.setCurrentKey(2);
+			assertEquals("u2", restored2.value());
+			restored2.setCurrentKey(3);
+			assertEquals("u3", restored2.value());
+
+			snapshot1.discardState();
+			assertFalse(isDirectoryEmpty(checkpointDir));
+
+			snapshot2.discardState();
+			assertTrue(isDirectoryEmpty(checkpointDir));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			deleteDirectorySilently(tempDir);
+		}
+	}
+
+	@Test
+	public void testRestoreWithWrongSerializers() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+			backend.initializeForJob(new JobID());
+
+			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+			
+			KvState<Integer, String, FsStateBackend> kv =
+					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+			kv.setCurrentKey(1);
+			kv.update("1");
+			kv.setCurrentKey(2);
+			kv.update("2");
+
+			KvStateSnapshot<Integer, String, FsStateBackend> snapshot =
+					kv.shapshot(682375462378L, System.currentTimeMillis());
+
+
+			@SuppressWarnings("unchecked")
+			TypeSerializer<Integer> fakeIntSerializer =
+					(TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
+
+			@SuppressWarnings("unchecked")
+			TypeSerializer<String> fakeStringSerializer =
+					(TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
+
+			try {
+				snapshot.restoreState(backend, fakeIntSerializer,
+						StringSerializer.INSTANCE, null, getClass().getClassLoader());
+				fail("should recognize wrong serializers");
+			} catch (IllegalArgumentException e) {
+				// expected
+			} catch (Exception e) {
+				fail("wrong exception");
+			}
+
+			try {
+				snapshot.restoreState(backend, IntSerializer.INSTANCE,
+						fakeStringSerializer, null, getClass().getClassLoader());
+				fail("should recognize wrong serializers");
+			} catch (IllegalArgumentException e) {
+				// expected
+			} catch (Exception e) {
+				fail("wrong exception");
+			}
+
+			try {
+				snapshot.restoreState(backend, fakeIntSerializer,
+						fakeStringSerializer, null, getClass().getClassLoader());
+				fail("should recognize wrong serializers");
+			} catch (IllegalArgumentException e) {
+				// expected
+			} catch (Exception e) {
+				fail("wrong exception");
+			}
+			
+			snapshot.discardState();
+
+			assertTrue(isDirectoryEmpty(checkpointDir));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			deleteDirectorySilently(tempDir);
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
+	private static void ensureLocalFileDeleted(Path path) {
+		URI uri = path.toUri();
+		if ("file".equals(uri.getScheme())) {
+			File file = new File(uri.getPath());
+			assertFalse("file not properly deleted", file.exists());
+		}
+		else {
+			throw new IllegalArgumentException("not a local path");
+		}
+	}
+	
+	private static void deleteDirectorySilently(File dir) {
+		try {
+			FileUtils.deleteDirectory(dir);
+		}
+		catch (IOException ignored) {}
+	}
+	
+	private static boolean isDirectoryEmpty(File directory) {
+		String[] nested = directory.list();
+		return  nested == null || nested.length == 0;
+	}
+	
+	private static String localFileUri(File path) {
+		return (OperatingSystem.isWindows() ? "file:/" : "file://") + path.getAbsolutePath();
+	}
+	
+	private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
+		byte[] holder = new byte[data.length];
+		assertEquals("not enough data", holder.length, is.read(holder));
+		assertEquals("too much data", -1, is.read());
+		assertArrayEquals("wrong data", data, holder);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
new file mode 100644
index 0000000..3410d09
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the {@link org.apache.flink.streaming.api.state.memory.MemoryStateBackend}.
+ */
+public class MemoryStateBackendTest {
+	
+	@Test
+	public void testSerializableState() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend();
+
+			HashMap<String, Integer> state = new HashMap<>();
+			state.put("hey there", 2);
+			state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+			
+			StateHandle<HashMap<String, Integer>> handle = backend.checkpointStateSerializable(state, 12, 459);
+			assertNotNull(handle);
+			
+			HashMap<String, Integer> restored = handle.getState(getClass().getClassLoader());
+			assertEquals(state, restored);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testOversizedState() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend(10);
+
+			HashMap<String, Integer> state = new HashMap<>();
+			state.put("hey there", 2);
+			state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+			try {
+				backend.checkpointStateSerializable(state, 12, 459);
+				fail("this should cause an exception");
+			}
+			catch (IOException e) {
+				// now darling, isn't that exactly what we wanted?
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testStateStream() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend();
+
+			HashMap<String, Integer> state = new HashMap<>();
+			state.put("hey there", 2);
+			state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+			StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
+			ObjectOutputStream oos = new ObjectOutputStream(os);
+			oos.writeObject(state);
+			oos.flush();
+			StreamStateHandle handle = os.closeAndGetHandle();
+			
+			assertNotNull(handle);
+
+			ObjectInputStream ois = new ObjectInputStream(handle.getState(getClass().getClassLoader()));
+			assertEquals(state, ois.readObject());
+			assertTrue(ois.available() <= 0);
+			ois.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testOversizedStateStream() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend(10);
+
+			HashMap<String, Integer> state = new HashMap<>();
+			state.put("hey there", 2);
+			state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+			StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
+			ObjectOutputStream oos = new ObjectOutputStream(os);
+			
+			try {
+				oos.writeObject(state);
+				oos.flush();
+				os.closeAndGetHandle();
+				fail("this should cause an exception");
+			}
+			catch (IOException e) {
+				// oh boy! what an exception!
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testKeyValueState() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend();
+			
+			KvState<Integer, String, MemoryStateBackend> kv = 
+					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+			
+			assertEquals(0, kv.size());
+			
+			// some modifications to the state
+			kv.setCurrentKey(1);
+			assertNull(kv.value());
+			kv.update("1");
+			assertEquals(1, kv.size());
+			kv.setCurrentKey(2);
+			assertNull(kv.value());
+			kv.update("2");
+			assertEquals(2, kv.size());
+			kv.setCurrentKey(1);
+			assertEquals("1", kv.value());
+			assertEquals(2, kv.size());
+			
+			// draw a snapshot
+			KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot1 = 
+					kv.shapshot(682375462378L, System.currentTimeMillis());
+			
+			// make some more modifications
+			kv.setCurrentKey(1);
+			kv.update("u1");
+			kv.setCurrentKey(2);
+			kv.update("u2");
+			kv.setCurrentKey(3);
+			kv.update("u3");
+
+			// draw another snapshot
+			KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot2 =
+					kv.shapshot(682375462379L, System.currentTimeMillis());
+			
+			// validate the original state
+			assertEquals(3, kv.size());
+			kv.setCurrentKey(1);
+			assertEquals("u1", kv.value());
+			kv.setCurrentKey(2);
+			assertEquals("u2", kv.value());
+			kv.setCurrentKey(3);
+			assertEquals("u3", kv.value());
+			
+			// restore the first snapshot and validate it
+			KvState<Integer, String, MemoryStateBackend> restored1 = snapshot1.restoreState(backend, 
+							IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+			assertEquals(2, restored1.size());
+			restored1.setCurrentKey(1);
+			assertEquals("1", restored1.value());
+			restored1.setCurrentKey(2);
+			assertEquals("2", restored1.value());
+
+			// restore the first snapshot and validate it
+			KvState<Integer, String, MemoryStateBackend> restored2 = snapshot2.restoreState(backend,
+					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+			assertEquals(3, restored2.size());
+			restored2.setCurrentKey(1);
+			assertEquals("u1", restored2.value());
+			restored2.setCurrentKey(2);
+			assertEquals("u2", restored2.value());
+			restored2.setCurrentKey(3);
+			assertEquals("u3", restored2.value());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRestoreWithWrongSerializers() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend();
+			KvState<Integer, String, MemoryStateBackend> kv =
+					backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+			
+			kv.setCurrentKey(1);
+			kv.update("1");
+			kv.setCurrentKey(2);
+			kv.update("2");
+			
+			KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot =
+					kv.shapshot(682375462378L, System.currentTimeMillis());
+
+
+			@SuppressWarnings("unchecked")
+			TypeSerializer<Integer> fakeIntSerializer = 
+					(TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
+
+			@SuppressWarnings("unchecked")
+			TypeSerializer<String> fakeStringSerializer = 
+					(TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
+
+			try {
+				snapshot.restoreState(backend, fakeIntSerializer,
+						StringSerializer.INSTANCE, null, getClass().getClassLoader());
+				fail("should recognize wrong serializers");
+			} catch (IllegalArgumentException e) {
+				// expected
+			} catch (Exception e) {
+				fail("wrong exception");
+			}
+
+			try {
+				snapshot.restoreState(backend, IntSerializer.INSTANCE,
+						fakeStringSerializer, null, getClass().getClassLoader());
+				fail("should recognize wrong serializers");
+			} catch (IllegalArgumentException e) {
+				// expected
+			} catch (Exception e) {
+				fail("wrong exception");
+			}
+
+			try {
+				snapshot.restoreState(backend, fakeIntSerializer,
+						fakeStringSerializer, null, getClass().getClassLoader());
+				fail("should recognize wrong serializers");
+			} catch (IllegalArgumentException e) {
+				// expected
+			} catch (Exception e) {
+				fail("wrong exception");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
deleted file mode 100644
index d6a8a54..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-public class StateHandleTest {
-
-	@Test
-	public void operatorStateHandleTest() throws Exception {
-
-		MockHandle<Serializable> h1 = new MockHandle<Serializable>(1);
-
-		OperatorStateHandle opHandle = new OperatorStateHandle(h1, true);
-		assertEquals(1, opHandle.getState(this.getClass().getClassLoader()));
-
-		OperatorStateHandle dsHandle = serializeDeserialize(opHandle);
-		MockHandle<Serializable> h2 = (MockHandle<Serializable>) dsHandle.getHandle();
-		assertFalse(h2.discarded);
-		assertNotNull(h1.state);
-		assertNull(h2.state);
-
-		dsHandle.discardState();
-
-		assertTrue(h2.discarded);
-	}
-
-	@Test
-	public void wrapperStateHandleTest() throws Exception {
-		final ClassLoader cl = this.getClass().getClassLoader();
-
-		MockHandle<Serializable> h1 = new MockHandle<Serializable>(1);
-		MockHandle<Serializable> h2 = new MockHandle<Serializable>(2);
-		StateHandle<Serializable> h3 = new MockHandle<Serializable>(3);
-
-		OperatorStateHandle opH1 = new OperatorStateHandle(h1, true);
-		OperatorStateHandle opH2 = new OperatorStateHandle(h2, false);
-
-		Map<String, OperatorStateHandle> opHandles = ImmutableMap.of("h1", opH1, "h2", opH2);
-
-		Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> fullState = Tuple2.of(h3,
-				opHandles);
-
-		List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = ImmutableList
-				.of(fullState);
-
-		WrapperStateHandle wrapperHandle = new WrapperStateHandle(chainedStates);
-
-		WrapperStateHandle dsWrapper = serializeDeserialize(wrapperHandle);
-
-		@SuppressWarnings("unchecked")
-		Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> dsFullState = ((List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>) dsWrapper
-				.getState(cl)).get(0);
-
-		Map<String, OperatorStateHandle> dsOpHandles = dsFullState.f1;
-
-		assertNull(dsFullState.f0.getState(cl));
-		assertFalse(((MockHandle<?>) dsFullState.f0).discarded);
-		assertFalse(((MockHandle<?>) dsOpHandles.get("h1").getHandle()).discarded);
-		assertNull(dsOpHandles.get("h1").getState(cl));
-		assertFalse(((MockHandle<?>) dsOpHandles.get("h2").getHandle()).discarded);
-		assertNull(dsOpHandles.get("h2").getState(cl));
-
-		dsWrapper.discardState();
-
-		assertTrue(((MockHandle<?>) dsFullState.f0).discarded);
-		assertTrue(((MockHandle<?>) dsOpHandles.get("h1").getHandle()).discarded);
-		assertTrue(((MockHandle<?>) dsOpHandles.get("h2").getHandle()).discarded);
-
-	}
-
-	@SuppressWarnings("unchecked")
-	private <X extends StateHandle<?>> X serializeDeserialize(X handle) throws IOException,
-			ClassNotFoundException {
-		byte[] serialized = InstantiationUtil.serializeObject(handle);
-		return (X) InstantiationUtil.deserializeObject(serialized, Thread.currentThread()
-				.getContextClassLoader());
-	}
-
-	@SuppressWarnings("serial")
-	private static class MockHandle<T> implements StateHandle<T> {
-
-		boolean discarded = false;
-		transient T state;
-
-		public MockHandle(T state) {
-			this.state = state;
-		}
-
-		@Override
-		public void discardState() {
-			state = null;
-			discarded = true;
-		}
-
-		@Override
-		public T getState(ClassLoader userCodeClassLoader) {
-			return state;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
deleted file mode 100644
index ead3af8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Test the functionality supported by stateful user functions for both
- * partitioned and non-partitioned user states. This test mimics the runtime
- * behavior of stateful stream operators.
- */
-public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
-
-	@Test
-	public void simpleStateTest() throws Exception {
-
-		List<String> out = new ArrayList<String>();
-
-		StreamMap<Integer, String> map = createOperatorWithContext(out, new ModKey(2), null);
-		StreamingRuntimeContext context = map.getRuntimeContext();
-
-		processInputs(map, Arrays.asList(1, 2, 3, 4, 5));
-
-		assertEquals(Arrays.asList("1", "2", "3", "4", "5"), out);
-		assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).value());
-		assertEquals(ImmutableMap.of(0, new MutableInt(2), 1, new MutableInt(3)), context.getOperatorStates().get("groupCounter").getPartitionedState());
-		assertEquals("12345", context.getOperatorState("concat", "", false).value());
-		assertEquals((Integer) 5, ((StatefulMapper) map.getUserFunction()).checkpointedCounter);
-
-		byte[] serializedState0 = InstantiationUtil.serializeObject(map.getStateSnapshotFromFunction(1, 1));
-		// Restore state but snapshot again before calling the value
-		byte[] serializedState = InstantiationUtil.serializeObject(createOperatorWithContext(out,
-				new ModKey(2), serializedState0).getStateSnapshotFromFunction(1, 1));
-
-		StreamMap<Integer, String> restoredMap = createOperatorWithContext(out, new ModKey(2), serializedState);
-		StreamingRuntimeContext restoredContext = restoredMap.getRuntimeContext();
-		
-		assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).value());
-		assertEquals(ImmutableMap.of(0, new MutableInt(2), 1, new MutableInt(3)), context.getOperatorStates().get("groupCounter").getPartitionedState());
-		assertEquals("12345", restoredContext.getOperatorState("concat", "", false).value());
-		assertEquals((Integer) 5, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
-		out.clear();
-
-		processInputs(restoredMap, Arrays.asList(7, 8));
-
-		assertEquals(Arrays.asList("7", "8"), out);
-		assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).value());
-		assertEquals(ImmutableMap.of(0, new MutableInt(3), 1, new MutableInt(4)), restoredContext.getOperatorStates().get("groupCounter")
-				.getPartitionedState());
-		assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).value());
-		assertEquals((Integer) 7, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
-
-	}
-
-	@Test
-	public void apiTest() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(3);
-
-		KeyedStream<Integer, Integer> keyedStream = env
-				.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6))
-				.keyBy(new ModKey(4));
-
-		keyedStream.map(new StatefulMapper()).addSink(new SinkFunction<String>() {
-			private static final long serialVersionUID = 1L;
-
-			public void invoke(String value) throws Exception {
-			}
-		});
-
-		keyedStream.map(new StatefulMapper2()).setParallelism(1).addSink(new SinkFunction<String>() {
-			private static final long serialVersionUID = 1L;
-
-			public void invoke(String value) throws Exception {
-			}
-		});
-
-		try {
-			keyedStream.shuffle();
-			fail();
-		} catch (UnsupportedOperationException e) {
-
-		}
-		
-		env.fromElements(0, 1, 2, 2, 2, 3, 4, 3, 4).keyBy(new KeySelector<Integer, Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value;
-			}
-
-		}).map(new PStateKeyRemovalTestMapper()).setParallelism(1).addSink(new SinkFunction<String>() {
-			private static final long serialVersionUID = 1L;
-
-			public void invoke(String value) throws Exception {
-			}
-		});
-
-		env.execute();
-	}
-
-	private void processInputs(StreamMap<Integer, ?> map, List<Integer> input) throws Exception {
-		for (Integer i : input) {
-			map.getRuntimeContext().setNextInput(new StreamRecord<Integer>(i, 0L));
-			map.processElement(new StreamRecord<Integer>(i, 0L));
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	private StreamMap<Integer, String> createOperatorWithContext(List<String> output,
-			KeySelector<Integer, ? extends Serializable> partitioner, byte[] serializedState) throws Exception {
-		final List<String> outputList = output;
-
-		StreamingRuntimeContext context = new StreamingRuntimeContext(
-				new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
-				new ExecutionConfig(),
-				partitioner,
-				new LocalStateHandleProvider<Serializable>(),
-				new HashMap<String, Accumulator<?, ?>>(),
-				null);
-
-		StreamMap<Integer, String> op = new StreamMap<Integer, String>(new StatefulMapper());
-
-		op.setup(new Output<StreamRecord<String>>() {
-
-			@Override
-			public void collect(StreamRecord<String> record) {
-				outputList.add(record.getValue());
-			}
-
-			@Override
-			public void emitWatermark(Watermark mark) {
-
-			}
-
-			@Override
-			public void close() {
-			}
-		}, context);
-
-		if (serializedState != null) {
-			ClassLoader cl = Thread.currentThread().getContextClassLoader();
-			op.restoreInitialState((Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>) InstantiationUtil
-					.deserializeObject(serializedState, cl));
-		}
-
-		op.open(null);
-
-		return op;
-	}
-
-	public static class StatefulMapper extends RichMapFunction<Integer, String> implements
-			Checkpointed<Integer> {
-		private static final long serialVersionUID = -9007873655253339356L;
-		OperatorState<Integer> counter;
-		OperatorState<MutableInt> groupCounter;
-		OperatorState<String> concat;
-
-		Integer checkpointedCounter = 0;
-
-		@Override
-		public String map(Integer value) throws Exception {
-			counter.update(counter.value() + 1);
-			MutableInt incremented = groupCounter.value();
-			incremented.increment();
-			groupCounter.update(incremented);
-			concat.update(concat.value() + value.toString());
-			checkpointedCounter++;
-			try {
-				counter.update(null);
-				fail();
-			} catch (RuntimeException e) {
-			}
-			return value.toString();
-		}
-
-		@Override
-		public void open(Configuration conf) throws IOException {
-			counter = getRuntimeContext().getOperatorState("counter", 0, false, intCheckpointer);
-			groupCounter = getRuntimeContext().getOperatorState("groupCounter", new MutableInt(0), true);
-			concat = getRuntimeContext().getOperatorState("concat", "", false);
-			try {
-				getRuntimeContext().getOperatorState("test", null, true);
-				fail();
-			} catch (RuntimeException e) {
-			}
-			try {
-				getRuntimeContext().getOperatorState("test", null, true, null);
-				fail();
-			} catch (RuntimeException e) {
-			}
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public void close() throws Exception {
-			Map<String, StreamOperatorState<?, ?>> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
-			PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = (PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
-			for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
-				Integer key = (Integer) count.getKey();
-				Integer expected = key < 3 ? 2 : 1;
-
-				assertEquals(new MutableInt(expected), count.getValue());
-			}
-		}
-
-		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return checkpointedCounter;
-		}
-
-		@Override
-		public void restoreState(Integer state) {
-			this.checkpointedCounter = (Integer) state;
-		}
-	}
-
-	public static class StatefulMapper2 extends RichMapFunction<Integer, String> {
-		private static final long serialVersionUID = 1L;
-		OperatorState<Integer> groupCounter;
-
-		@Override
-		public String map(Integer value) throws Exception {
-			groupCounter.update(groupCounter.value() + 1);
-
-			return value.toString();
-		}
-
-		@Override
-		public void open(Configuration conf) throws IOException {
-			groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true, intCheckpointer);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public void close() throws Exception {
-			Map<String, StreamOperatorState<?, ?>> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
-			PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = 
-					(PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
-			for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
-				Integer key = (Integer) count.getKey();
-				Integer expected = key < 3 ? 2 : 1;
-				assertEquals(expected, count.getValue());
-			}
-		}
-
-	}
-	
-	public static StateCheckpointer<Integer, String> intCheckpointer = new StateCheckpointer<Integer, String>() {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String snapshotState(Integer state, long checkpointId, long checkpointTimestamp) {
-			return state.toString();
-		}
-
-		@Override
-		public Integer restoreState(String stateSnapshot) {
-			return Integer.parseInt(stateSnapshot);
-		}
-	};
-
-	public static class PStateKeyRemovalTestMapper extends RichMapFunction<Integer, String> {
-
-		private static final long serialVersionUID = 1L;
-		OperatorState<Boolean> seen;
-
-		@Override
-		public String map(Integer value) throws Exception {
-			if (value == 0) {
-				seen.update(null);
-			}else{
-				Boolean s = seen.value();
-				if (s) {
-					seen.update(null);
-				} else {
-					seen.update(true);
-				}
-			}
-
-			return value.toString();
-		}
-
-		public void open(Configuration c) throws IOException {
-			seen = getRuntimeContext().getOperatorState("seen", false, true);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public void close() throws Exception {
-			Map<String, StreamOperatorState<?, ?>> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
-			PartitionedStreamOperatorState<Integer, Boolean, Boolean> seen = (PartitionedStreamOperatorState<Integer, Boolean, Boolean>) states.get("seen");
-			assertFalse(seen.getPartitionedState().containsKey(0));
-			assertEquals(2,seen.getPartitionedState().size());
-			for (Entry<Serializable, Boolean> s : seen.getPartitionedState().entrySet()) {
-					assertTrue(s.getValue());
-			}
-		}
-
-	}
-
-	public static class ModKey implements KeySelector<Integer, Integer> {
-
-		private static final long serialVersionUID = 4193026742083046736L;
-
-		int base;
-
-		public ModKey(int base) {
-			this.base = base;
-		}
-
-		@Override
-		public Integer getKey(Integer value) throws Exception {
-			return value % base;
-		}
-
-	}
-
-}