You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:35 UTC

[08/24] flink git commit: [FLINK-2808] [streaming] Refactor and extend state backend abstraction

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index dc9a152..7a1bea4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -19,26 +19,17 @@
 package org.apache.flink.streaming.api.operators;
 
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.state.OperatorStateHandle;
-import org.apache.flink.streaming.api.state.PartitionedStreamOperatorState;
-import org.apache.flink.streaming.api.state.StreamOperatorState;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import static java.util.Objects.requireNonNull;
 
 /**
  * This is used as the base class for operators that have a user-defined
@@ -50,22 +41,20 @@ import org.slf4j.LoggerFactory;
  * @param <F>
  *            The type of the user function
  */
-public abstract class AbstractUdfStreamOperator<OUT, F extends Function> 
-		extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
+public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> {
 
 	private static final long serialVersionUID = 1L;
 	
-	private static final Logger LOG = LoggerFactory.getLogger(AbstractUdfStreamOperator.class);
-
+	
 	/** the user function */
 	protected final F userFunction;
 	
 	/** Flag to prevent duplicate function.close() calls in close() and dispose() */
-	private boolean functionsClosed = false;
-
+	private transient boolean functionsClosed = false;
+	
 	
 	public AbstractUdfStreamOperator(F userFunction) {
-		this.userFunction = Objects.requireNonNull(userFunction);
+		this.userFunction = requireNonNull(userFunction);
 	}
 
 	/**
@@ -79,18 +68,13 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 	// ------------------------------------------------------------------------
 	//  operator life cycle
 	// ------------------------------------------------------------------------
-	
-	@Override
-	public final void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) {
-		super.setup(output, runtimeContext);
-		FunctionUtils.setFunctionRuntimeContext(userFunction, runtimeContext);
-	}
-
 
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		FunctionUtils.openFunction(userFunction, parameters);
+	public void open() throws Exception {
+		super.open();
+		
+		FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
+		FunctionUtils.openFunction(userFunction, new Configuration());
 	}
 
 	@Override
@@ -118,76 +102,81 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 	// ------------------------------------------------------------------------
 	
 	@Override
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> snapshots) throws Exception {
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp);
 
-		// Restore state using the Checkpointed interface
-		if (userFunction instanceof Checkpointed && snapshots.f0 != null) {
-			((Checkpointed) userFunction).restoreState(snapshots.f0.getState(runtimeContext.getUserCodeClassLoader()));
-		}
-		
-		if (snapshots.f1 != null) {
-			// We iterate over the states registered for this operator, initialize and restore it
-			for (Entry<String, OperatorStateHandle> snapshot : snapshots.f1.entrySet()) {
-				StreamOperatorState restoredOpState = runtimeContext.getState(snapshot.getKey(), snapshot.getValue().isPartitioned());
-				StateHandle<Serializable> checkpointHandle = snapshot.getValue();
-				restoredOpState.restoreState(checkpointHandle, runtimeContext.getUserCodeClassLoader());
+		if (userFunction instanceof Checkpointed) {
+			@SuppressWarnings("unchecked")
+			Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
+			
+			Serializable udfState;
+			try {
+				udfState = chkFunction.snapshotState(checkpointId, timestamp);
+			} 
+			catch (Exception e) {
+				throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
+			}
+			
+			if (udfState != null) {
+				try {
+					StateBackend<?> stateBackend = getStateBackend();
+					StateHandle<Serializable> handle = 
+							stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
+					state.setFunctionState(handle);
+				}
+				catch (Exception e) {
+					throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
+							+ e.getMessage(), e);
+				}
 			}
 		}
 		
+		return state;
 	}
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> getStateSnapshotFromFunction(long checkpointId, long timestamp)
-			throws Exception {
-		// Get all the states for the operator
-		Map<String, StreamOperatorState<?, ?>> operatorStates = runtimeContext.getOperatorStates();
+	@Override
+	public void restoreState(StreamTaskState state) throws Exception {
+		super.restoreState(state);
 		
-		Map<String, OperatorStateHandle> operatorStateSnapshots;
-		if (operatorStates.isEmpty()) {
-			// We return null to signal that there is nothing to checkpoint
-			operatorStateSnapshots = null;
-		} else {
-			// Checkpoint the states and store the handles in a map
-			Map<String, OperatorStateHandle> snapshots = new HashMap<String, OperatorStateHandle>();
-
-			for (Entry<String, StreamOperatorState<?, ?>> state : operatorStates.entrySet()) {
-				boolean isPartitioned = state.getValue() instanceof PartitionedStreamOperatorState;
-				snapshots.put(state.getKey(),
-						new OperatorStateHandle(state.getValue().snapshotState(checkpointId, timestamp),
-								isPartitioned));
-			}
-
-			operatorStateSnapshots = snapshots;
-		}
+		StateHandle<Serializable> stateHandle =  state.getFunctionState();
 		
-		StateHandle<Serializable> checkpointedSnapshot = null;
-		// if the UDF implements the Checkpointed interface we draw a snapshot
-		if (userFunction instanceof Checkpointed) {
-			StateHandleProvider<Serializable> provider = runtimeContext.getStateHandleProvider();
-			Serializable state = ((Checkpointed) userFunction).snapshotState(checkpointId, timestamp);
-			if (state != null) {
-				checkpointedSnapshot = provider.createStateHandle(state);
+		if (userFunction instanceof Checkpointed && stateHandle != null) {
+			@SuppressWarnings("unchecked")
+			Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
+			
+			Serializable functionState = stateHandle.getState(getUserCodeClassloader());
+			if (functionState != null) {
+				try {
+					chkFunction.restoreState(functionState);
+				}
+				catch (Exception e) {
+					throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
+				}
 			}
 		}
-		
-		// if we have either operator or checkpointed state we store it in a
-		// tuple2 otherwise return null
-		if (operatorStateSnapshots != null || checkpointedSnapshot != null) {
-			return Tuple2.of(checkpointedSnapshot, operatorStateSnapshots);
-		} else {
-			return null;
-		}
-
 	}
 
-	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+	@Override
+	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
+		super.notifyOfCompletedCheckpoint(checkpointId);
+
 		if (userFunction instanceof CheckpointNotifier) {
-			try {
-				((CheckpointNotifier) userFunction).notifyCheckpointComplete(checkpointId);
-			} catch (Exception e) {
-				throw new Exception("Error while confirming checkpoint " + checkpointId + " to the stream function", e);
-			}
+			((CheckpointNotifier) userFunction).notifyCheckpointComplete(checkpointId);
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * 
+	 * Since the streaming API does not implement any parametrization of functions via a
+	 * configuration, the config returned here is actually empty.
+	 * 
+	 * @return The user function parameters (currently empty)
+	 */
+	public Configuration getUserFunctionParameters() {
+		return new Configuration();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
new file mode 100644
index 0000000..3a752b0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+/**
+ * Defines the chaining scheme for the operator.
+ * By default {@link #ALWAYS} is used, which means operators will be eagerly chained whenever possible.
+ */
+public enum ChainingStrategy {
+
+	/**
+	 * Chaining will happen even if chaining is disabled on the execution environment.
+	 * This should only be used by system-level operators, not operators implemented by users.
+	 */
+	FORCE_ALWAYS,
+
+	/** 
+	 * Operators will be eagerly chained whenever possible, for
+	 * maximal performance. It is generally a good practice to allow maximal
+	 * chaining and increase operator parallelism
+	 */
+	ALWAYS,
+
+	/**
+	 * The operator will not be chained to the preceding or succeeding operators.
+	 */
+	NEVER,
+	
+	
+	HEAD
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
index 7ca540f..705c1b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
@@ -35,7 +35,7 @@ public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
 	 * Processes one element that arrived at this operator.
 	 * This method is guaranteed to not be called concurrently with other methods of the operator.
 	 */
-	public void processElement(StreamRecord<IN> element) throws Exception;
+	void processElement(StreamRecord<IN> element) throws Exception;
 
 	/**
 	 * Processes a {@link Watermark}.
@@ -43,5 +43,5 @@ public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
 	 *
 	 * @see org.apache.flink.streaming.api.watermark.Watermark
 	 */
-	public void processWatermark(Watermark mark) throws Exception;
+	void processWatermark(Watermark mark) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
index b68432604..0cbc954 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
@@ -34,7 +34,7 @@ public interface Output<T> extends Collector<T> {
 	 * operators.
 	 *
 	 * <p>A watermark specifies that no element with a timestamp older or equal to the watermark
-	 * timestamp will be emitted in the future.</p>
+	 * timestamp will be emitted in the future.
 	 */
 	void emitWatermark(Watermark mark);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
deleted file mode 100644
index d400fc4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.OperatorStateHandle;
-
-/**
- * Interface for Stream operators that can have state. This interface is used for checkpointing
- * and restoring that state.
- *
- * @param <OUT> The output type of the operator
- */
-public interface StatefulStreamOperator<OUT> extends StreamOperator<OUT> {
-
-	void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> state) throws Exception;
-
-	Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception;
-
-	void notifyCheckpointComplete(long checkpointId) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
index ff7f662..23b638e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -36,8 +35,8 @@ public class StreamFlatMap<IN, OUT>
 	}
 
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
+	public void open() throws Exception {
+		super.open();
 		collector = new TimestampedCollector<OUT>(output);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 732630a..79e319a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -22,53 +22,41 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.HashMap;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.state.KVMapCheckpointer;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
 
+public class StreamGroupedFold<IN, OUT, KEY>
+		extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
 		implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
 
 	private static final long serialVersionUID = 1L;
 
 	// Grouped values
-	private KeySelector<IN, ?> keySelector;
-	private transient OperatorState<HashMap<Object, OUT>> values;
-
+	private transient OperatorState<OUT> values;
+	
+	private transient OUT initialValue;
+	
 	// Initial value serialization
 	private byte[] serializedInitialValue;
+	
 	private TypeSerializer<OUT> outTypeSerializer;
-	private transient OUT initialValue;
-
-	// Store the typeinfo, create serializer during runtime
-	private TypeInformation<Object> keyTypeInformation;
-
-	@SuppressWarnings("unchecked")
-	public StreamGroupedFold(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector,
-								OUT initialValue, TypeInformation<IN> inTypeInformation) {
+	
+	public StreamGroupedFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
 		super(folder);
-		this.keySelector = keySelector;
 		this.initialValue = initialValue;
-		keyTypeInformation = (TypeInformation<Object>) TypeExtractor
-				.getKeySelectorTypes(keySelector, inTypeInformation);
-
 	}
 
 	@Override
-	public void open(Configuration configuration) throws Exception {
-		super.open(configuration);
+	public void open() throws Exception {
+		super.open();
 
 		if (serializedInitialValue == null) {
 			throw new RuntimeException("No initial value was serialized for the fold " +
@@ -80,25 +68,20 @@ public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, F
 				new DataInputStream(bais)
 		);
 		initialValue = outTypeSerializer.deserialize(in);
-
-		values = runtimeContext.getOperatorState("flink_internal_fold_values",
-				new HashMap<Object, OUT>(), false,
-				new KVMapCheckpointer<>(keyTypeInformation.createSerializer(executionConfig),
-						outTypeSerializer));
+		values = createKeyValueState(outTypeSerializer, null);
 	}
 
 	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
-		Object key = keySelector.getKey(element.getValue());
-		OUT value = values.value().get(key);
+		OUT value = values.value();
 
 		if (value != null) {
 			OUT folded = userFunction.fold(outTypeSerializer.copy(value), element.getValue());
-			values.value().put(key, folded);
+			values.update(folded);
 			output.collect(element.replace(folded));
 		} else {
 			OUT first = userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue());
-			values.value().put(key, first);
+			values.update(first);
 			output.collect(element.replace(first));
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 579814d..ebc4b09 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -19,61 +19,43 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.state.KVMapCheckpointer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import java.util.HashMap;
-
 public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
 		implements OneInputStreamOperator<IN, IN> {
 
 	private static final long serialVersionUID = 1L;
+	
+	private transient OperatorState<IN> values;
+	
+	private TypeSerializer<IN> serializer;
 
-	private KeySelector<IN, ?> keySelector;
-	private transient OperatorState<HashMap<Object, IN>> values;
-
-	// Store the typeinfo, create serializer during runtime
-	private TypeInformation<Object> keyTypeInformation;
-	private TypeInformation<IN> valueTypeInformation;
-
-	@SuppressWarnings("unchecked")
-	public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector,
-								TypeInformation<IN> typeInformation) {
+	
+	public StreamGroupedReduce(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
 		super(reducer);
-		this.keySelector = keySelector;
-		valueTypeInformation = typeInformation;
-		keyTypeInformation = (TypeInformation<Object>) TypeExtractor
-				.getKeySelectorTypes(keySelector, typeInformation);
+		this.serializer = serializer;
 	}
 
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-
-		values = runtimeContext.getOperatorState("flink_internal_reduce_values",
-				new HashMap<Object, IN>(), false,
-				new KVMapCheckpointer<>(keyTypeInformation.createSerializer(executionConfig),
-						valueTypeInformation.createSerializer(executionConfig)));
+	public void open() throws Exception {
+		super.open();
+		values = createKeyValueState(serializer, null);
 	}
 
 	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
-		Object key = keySelector.getKey(element.getValue());
-
-		IN currentValue = values.value().get(key);
+		IN value = element.getValue();
+		IN currentValue = values.value();
+		
 		if (currentValue != null) {
-			// TODO: find a way to let operators copy elements (maybe)
-			IN reduced = userFunction.reduce(currentValue, element.getValue());
-			values.value().put(key, reduced);
+			IN reduced = userFunction.reduce(currentValue, value);
+			values.update(reduced);
 			output.collect(element.replace(reduced));
 		} else {
-			values.value().put(key, element.getValue());
-			output.collect(element.replace(element.getValue()));
+			values.update(value);
+			output.collect(element.replace(value));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index d65dc64..fac26f1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -19,9 +19,10 @@ package org.apache.flink.streaming.api.operators;
 
 import java.io.Serializable;
 
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
 /**
  * Basic interface for stream operators. Implementers would implement one of
@@ -29,27 +30,25 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
  * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators
  * that process elements.
  * 
- * <p>
- * The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
+ * <p> The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
  * offers default implementation for the lifecycle and properties methods.
  *
- * <p>
- * Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
+ * <p> Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
  * the timer service, timer callbacks are also guaranteed not to be called concurrently with
  * methods on {@code StreamOperator}.
  * 
  * @param <OUT> The output type of the operator
  */
 public interface StreamOperator<OUT> extends Serializable {
-
+	
 	// ------------------------------------------------------------------------
-	//  Life Cycle
+	//  life cycle
 	// ------------------------------------------------------------------------
 	
 	/**
 	 * Initializes the operator. Sets access to the context and the output.
 	 */
-	void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext);
+	void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);
 
 	/**
 	 * This method is called immediately before any elements are processed, it should contain the
@@ -57,7 +56,7 @@ public interface StreamOperator<OUT> extends Serializable {
 	 * 
 	 * @throws java.lang.Exception An exception in this method causes the operator to fail.
 	 */
-	void open(Configuration config) throws Exception;
+	void open() throws Exception;
 
 	/**
 	 * This method is called after all records have been added to the operators via the methods
@@ -82,43 +81,66 @@ public interface StreamOperator<OUT> extends Serializable {
 	 * that the operator has acquired.
 	 */
 	void dispose();
-	
 
 	// ------------------------------------------------------------------------
-	//  Context and chaining properties
+	//  state snapshots
 	// ------------------------------------------------------------------------
+
+	/**
+	 * Called to draw a state snapshot from the operator. This method snapshots the operator state
+	 * (if the operator is stateful) and the key/value state (if it is being used and has been
+	 * initialized).
+	 *
+	 * @param checkpointId The ID of the checkpoint.
+	 * @param timestamp The timestamp of the checkpoint.
+	 *
+	 * @return The StreamTaskState object, possibly containing the snapshots for the
+	 *         operator and key/value state.
+	 *
+	 * @throws Exception Forwards exceptions that occur while drawing snapshots from the operator
+	 *                   and the key/value state.
+	 */
+	StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception;
 	
 	/**
-	 * Returns a context that allows the operator to query information about the execution and also
-	 * to interact with systems such as broadcast variables and managed state. This also allows
-	 * to register timers.
+	 * Restores the operator state, if this operator's execution is recovering from a checkpoint.
+	 * This method restores the operator state (if the operator is stateful) and the key/value state
+	 * (if it had been used and was initialized when the snapshot ocurred).
+	 *
+	 * <p>This method is called after {@link #setup(StreamTask, StreamConfig, Output)}
+	 * and before {@link #open()}.
+	 *
+	 * @param state The state of operator that was snapshotted as part of checkpoint
+	 *              from which the execution is restored.
+	 *
+	 * @throws Exception Exceptions during state restore should be forwarded, so that the system can
+	 *                   properly react to failed state restore and fail the execution attempt.
 	 */
-	StreamingRuntimeContext getRuntimeContext();
+	void restoreState(StreamTaskState state) throws Exception;
 
 	/**
+	 * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager.
+	 *
+	 * @param checkpointId The ID of the checkpoint that has been completed.
+	 *
+	 * @throws Exception Exceptions during checkpoint acknowledgement may be forwarded and will cause
+	 *                   the program to fail and enter recovery.
+	 */
+	void notifyOfCompletedCheckpoint(long checkpointId) throws Exception;
+
+	// ------------------------------------------------------------------------
+	//  miscellaneous
+	// ------------------------------------------------------------------------
+	
+	void setKeyContextElement(StreamRecord<?> record) throws Exception;
+	
+	/**
 	 * An operator can return true here to disable copying of its input elements. This overrides
 	 * the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig}
 	 */
 	boolean isInputCopyingDisabled();
-
-	void setChainingStrategy(ChainingStrategy strategy);
-
+	
 	ChainingStrategy getChainingStrategy();
 
-	/**
-	 * Defines the chaining scheme for the operator. By default <b>ALWAYS</b> is used,
-	 * which means operators will be eagerly chained whenever possible, for
-	 * maximal performance. It is generally a good practice to allow maximal
-	 * chaining and increase operator parallelism. </p> When the strategy is set
-	 * to <b>NEVER</b>, the operator will not be chained to the preceding or succeeding
-	 * operators.</p> <b>HEAD</b> strategy marks a start of a new chain, so that the
-	 * operator will not be chained to preceding operators, only succeding ones.
-	 *
-	 * <b>FORCE_ALWAYS</b> will enable chaining even if chaining is disabled on the execution
-	 * environment. This should only be used by system-level operators, not operators implemented
-	 * by users.
-	 */
-	public static enum ChainingStrategy {
-		FORCE_ALWAYS, ALWAYS, NEVER, HEAD
-	}
+	void setChainingStrategy(ChainingStrategy strategy);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
index c0815b5..1ce4ff6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -53,8 +52,8 @@ public class StreamProject<IN, OUT extends Tuple>
 	}
 
 	@Override
-	public void open(Configuration config) throws Exception {
-		super.open(config);
+	public void open() throws Exception {
+		super.open();
 		outTuple = outSerializer.createInstance();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index ecf799b..fbecbd1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -43,7 +43,8 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 	}
 
 	public void run(final Object lockingObject, final Output<StreamRecord<T>> collector) throws Exception {
-
+		final ExecutionConfig executionConfig = getExecutionConfig();
+		
 		if (userFunction instanceof EventTimeSourceFunction) {
 			ctx = new ManualWatermarkContext<T>(lockingObject, collector);
 		} else if (executionConfig.getAutoWatermarkInterval() > 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
new file mode 100644
index 0000000..40998dd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Implementation of the {@link org.apache.flink.api.common.functions.RuntimeContext},
+ * for streaming operators.
+ */
+public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
+
+	/** The operator to which this function belongs */
+	private final AbstractStreamOperator<?> operator;
+	
+	/** The task environment running the operator */
+	private final Environment taskEnvironment;
+	
+	/** The key/value state, if the user-function requests it */
+	private OperatorState<?> keyValueState;
+	
+	/** Type of the values stored in the state, to make sure repeated requests of the state are consistent */
+	private TypeInformation<?> stateTypeInfo;
+	
+	
+	public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
+									Environment env, Map<String, Accumulator<?, ?>> accumulators) {
+		super(env.getTaskName(),
+				env.getNumberOfSubtasks(),
+				env.getIndexInSubtaskGroup(),
+				env.getUserClassLoader(),
+				operator.getExecutionConfig(),
+				accumulators,
+				env.getDistributedCacheEntries());
+		
+		this.operator = operator;
+		this.taskEnvironment = env;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Returns the input split provider associated with the operator.
+	 * 
+	 * @return The input split provider.
+	 */
+	public InputSplitProvider getInputSplitProvider() {
+		return taskEnvironment.getInputSplitProvider();
+	}
+
+	/**
+	 * Register a timer callback. At the specified time the {@link Triggerable } will be invoked.
+	 * This call is guaranteed to not happen concurrently with method calls on the operator.
+	 *
+	 * @param time The absolute time in milliseconds.
+	 * @param target The target to be triggered.
+	 */
+	public void registerTimer(long time, Triggerable target) {
+		operator.registerTimer(time, target);
+	}
+	
+	// ------------------------------------------------------------------------
+	//  broadcast variables
+	// ------------------------------------------------------------------------
+
+	@Override
+	public <RT> List<RT> getBroadcastVariable(String name) {
+		throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
+	}
+
+	@Override
+	public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
+		throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
+	}
+
+	// ------------------------------------------------------------------------
+	//  key/value state
+	// ------------------------------------------------------------------------
+
+	@Override
+	public <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState) {
+		requireNonNull(stateType, "The state type class must not be null");
+
+		TypeInformation<S> typeInfo;
+		try {
+			typeInfo = TypeExtractor.getForClass(stateType);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Cannot analyze type '" + stateType.getName() + 
+					"' from the class alone, due to generic type parameters. " +
+					"Please specify the TypeInformation directly.");
+		}
+		
+		return getKeyValueState(typeInfo, defaultState);
+	}
+
+	@Override
+	public <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState) {
+		requireNonNull(stateType, "The state type information must not be null");
+		
+		// check if this is a repeated call to access the state 
+		if (this.stateTypeInfo != null && this.keyValueState != null) {
+			// repeated call
+			if (this.stateTypeInfo.equals(stateType)) {
+				// valid case, same type requested again
+				@SuppressWarnings("unchecked")
+				OperatorState<S> previous = (OperatorState<S>) this.keyValueState;
+				return previous;
+			}
+			else {
+				// invalid case, different type requested this time
+				throw new IllegalStateException("Cannot initialize key/value state for type " + stateType +
+						" ; The key/value state has already been created and initialized for a different type: " +
+						this.stateTypeInfo);
+			}
+		}
+		else {
+			// first time access to the key/value state
+			try {
+				OperatorState<S> state = operator.createKeyValueState(stateType, defaultState);
+				this.keyValueState = state;
+				this.stateTypeInfo = stateType;
+				return state;
+			}
+			catch (RuntimeException e) {
+				throw e;
+			}
+			catch (Exception e) {
+				throw new RuntimeException("Cannot initialize the key/value state", e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index cbf59c1..806cef2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.operators.co;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -44,8 +43,8 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
 	}
 
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
+	public void open() throws Exception {
+		super.open();
 		collector = new TimestampedCollector<OUT>(output);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
new file mode 100644
index 0000000..b974674
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Base class for key/value state implementations that are backed by a regular heap hash map. The
+ * concrete implementations define how the state is checkpointed.
+ * 
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ * @param <Backend> The type of the backend that snapshots this key/value state.
+ */
+public abstract class AbstractHeapKvState<K, V, Backend extends StateBackend<Backend>> implements KvState<K, V, Backend> {
+
+	/** Map containing the actual key/value pairs */
+	private final HashMap<K, V> state;
+	
+	/** The serializer for the keys */
+	private final TypeSerializer<K> keySerializer;
+
+	/** The serializer for the values */
+	private final TypeSerializer<V> valueSerializer;
+	
+	/** The value that is returned when no other value has been associated with a key, yet */
+	private final V defaultValue;
+	
+	/** The current key, which the next value methods will refer to */
+	private K currentKey;
+	
+	/**
+	 * Creates a new empty key/value state.
+	 * 
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the values.
+	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+	 */
+	protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
+									TypeSerializer<V> valueSerializer,
+									V defaultValue) {
+		this(keySerializer, valueSerializer, defaultValue, new HashMap<K, V>());
+	}
+
+	/**
+	 * Creates a new key/value state for the given hash map of key/value pairs.
+	 * 
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the values.
+	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+	 * @param state The state map to use in this kev/value state. May contain initial state.   
+	 */
+	protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
+									TypeSerializer<V> valueSerializer,
+									V defaultValue,
+									HashMap<K, V> state) {
+		this.state = requireNonNull(state);
+		this.keySerializer = requireNonNull(keySerializer);
+		this.valueSerializer = requireNonNull(valueSerializer);
+		this.defaultValue = defaultValue;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public V value() {
+		V value = state.get(currentKey);
+		return value != null ? value : defaultValue;
+	}
+
+	@Override
+	public void update(V value) {
+		if (value != null) {
+			state.put(currentKey, value);
+		}
+		else {
+			state.remove(currentKey);
+		}
+	}
+
+	@Override
+	public void setCurrentKey(K currentKey) {
+		this.currentKey = currentKey;
+	}
+
+	@Override
+	public int size() {
+		return state.size();
+	}
+
+	@Override
+	public void dispose() {
+		state.clear();
+	}
+
+	/**
+	 * Gets the serializer for the keys.
+	 * @return The serializer for the keys.
+	 */
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	/**
+	 * Gets the serializer for the values.
+	 * @return The serializer for the values.
+	 */
+	public TypeSerializer<V> getValueSerializer() {
+		return valueSerializer;
+	}
+
+	// ------------------------------------------------------------------------
+	//  checkpointing utilities
+	// ------------------------------------------------------------------------
+	
+	protected void writeStateToOutputView(final DataOutputView out) throws IOException {
+		for (Map.Entry<K, V> entry : state.entrySet()) {
+			keySerializer.serialize(entry.getKey(), out);
+			valueSerializer.serialize(entry.getValue(), out);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java
deleted file mode 100644
index 14d1504..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.state.StateCheckpointer;
-
-public class BasicCheckpointer implements StateCheckpointer<Serializable, Serializable> {
-
-	@Override
-	public Serializable snapshotState(Serializable state, long checkpointId, long checkpointTimestamp) {
-		return state;
-	}
-
-	@Override
-	public Serializable restoreState(Serializable stateSnapshot) {
-		return stateSnapshot;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
deleted file mode 100644
index 2091624..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.api.common.state.StateCheckpointer;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
-
-public class EagerStateStore<S, C extends Serializable> implements PartitionedStateStore<S, C> {
-
-	private StateCheckpointer<S, C> checkpointer;
-	private final StateHandleProvider<Serializable> provider;
-
-	private Map<Serializable, S> fetchedState;
-
-	@SuppressWarnings("unchecked")
-	public EagerStateStore(StateCheckpointer<S, C> checkpointer, StateHandleProvider<C> provider) {
-		this.checkpointer = checkpointer;
-		this.provider = (StateHandleProvider<Serializable>) provider;
-
-		fetchedState = new HashMap<Serializable, S>();
-	}
-
-	@Override
-	public S getStateForKey(Serializable key) throws IOException {
-		return fetchedState.get(key);
-	}
-
-	@Override
-	public void setStateForKey(Serializable key, S state) {
-		fetchedState.put(key, state);
-	}
-
-	@Override
-	public void removeStateForKey(Serializable key) {
-		fetchedState.remove(key);
-	}
-
-	@Override
-	public Map<Serializable, S> getPartitionedState() throws IOException {
-		return fetchedState;
-	}
-
-	@Override
-	public StateHandle<Serializable> snapshotStates(long checkpointId, long checkpointTimestamp) {
-		// we map the values in the state-map using the state-checkpointer and store it as a checkpoint
-		Map<Serializable, C> checkpoints = new HashMap<Serializable, C>();
-		for (Entry<Serializable, S> stateEntry : fetchedState.entrySet()) {
-			checkpoints.put(stateEntry.getKey(),
-					checkpointer.snapshotState(stateEntry.getValue(), checkpointId, checkpointTimestamp));
-		}
-		return provider.createStateHandle((Serializable) checkpoints);
-	}
-
-	@Override
-	public void restoreStates(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader)
-			throws Exception {
-
-		@SuppressWarnings("unchecked")
-		Map<Serializable, C> checkpoints = (Map<Serializable, C>) snapshot.getState(userCodeClassLoader);
-
-		// we map the values back to the state from the checkpoints
-		for (Entry<Serializable, C> snapshotEntry : checkpoints.entrySet()) {
-			fetchedState.put(snapshotEntry.getKey(), (S) checkpointer.restoreState(snapshotEntry.getValue()));
-		}
-	}
-
-	@Override
-	public boolean containsKey(Serializable key) {
-		return fetchedState.containsKey(key);
-	}
-
-	@Override
-	public void setCheckPointer(StateCheckpointer<S, C> checkpointer) {
-		this.checkpointer = checkpointer;
-	}
-
-	@Override
-	public String toString() {
-		return fetchedState.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java
deleted file mode 100644
index 17cb6a0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.state.StateCheckpointer;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Implementation of the {@link StateCheckpointer} interface for a map storing
- * types compatible with Flink's serialization system.
- *
- * @param <K> key type
- * @param <V> value type
- */
-public class KVMapCheckpointer<K, V> implements StateCheckpointer<HashMap<K, V>, byte[]> {
-
-	private TypeSerializer<K> keySerializer;
-	private TypeSerializer<V> valueSerializer;
-
-	public KVMapCheckpointer(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
-		this.keySerializer = keySerializer;
-		this.valueSerializer = valueSerializer;
-	}
-
-	@Override
-	public byte[] snapshotState(HashMap<K, V> stateMap, long checkpointId, long checkpointTimestamp) {
-		ByteArrayOutputStream bos = new ByteArrayOutputStream(stateMap.size() * 16);
-		DataOutputView out = new OutputViewDataOutputStreamWrapper(new DataOutputStream(bos));
-		try {
-			out.writeInt(stateMap.size());
-			for (Map.Entry<K, V> kv : stateMap.entrySet()) {
-				keySerializer.serialize(kv.getKey(), out);
-				valueSerializer.serialize(kv.getValue(), out);
-			}
-		} catch (IOException e) {
-			throw new RuntimeException("Failed to write snapshot", e);
-		}
-		return bos.toByteArray();
-	}
-
-	@Override
-	public HashMap<K, V> restoreState(byte[] stateSnapshot) {
-		ByteArrayInputView in = new ByteArrayInputView(stateSnapshot);
-
-		HashMap<K, V> returnMap = new HashMap<>();
-		try {
-			int size = in.readInt();
-			for (int i = 0; i < size; i++) {
-				returnMap.put(keySerializer.deserialize(in), valueSerializer.deserialize(in));
-			}
-		} catch (IOException e) {
-			throw new RuntimeException("Failed to read snapshot", e);
-		}
-
-		return returnMap;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
new file mode 100644
index 0000000..9c628f8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.state.OperatorState;
+
+/**
+ * Key/Value state implementation for user-defined state. The state is backed by a state
+ * backend, which typically follows one of the following patterns: Either the state is stored
+ * in the key/value state object directly (meaning in the executing JVM) and snapshotted by the
+ * state backend into some store (during checkpoints), or the key/value state is in fact backed
+ * by an external key/value store as the state backend, and checkpoints merely record the
+ * metadata of what is considered part of the checkpoint.
+ * 
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public interface KvState<K, V, Backend extends StateBackend<Backend>> extends OperatorState<V> {
+
+	/**
+	 * Sets the current key, which will be used to retrieve values for the next calls to
+	 * {@link #value()} and {@link #update(Object)}.
+	 * 
+	 * @param key The key.
+	 */
+	void setCurrentKey(K key);
+
+	/**
+	 * Creates a snapshot of this state.
+	 * 
+	 * @param checkpointId The ID of the checkpoint for which the snapshot should be created.
+	 * @param timestamp The timestamp of the checkpoint.
+	 * @return A snapshot handle for this key/value state.
+	 * 
+	 * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system
+	 *                   can react to failed snapshots.
+	 */
+	KvStateSnapshot<K, V, Backend> shapshot(long checkpointId, long timestamp) throws Exception;
+
+	/**
+	 * Gets the number of key/value pairs currently stored in the state. Note that is a key
+	 * has been associated with "null", the key is removed from the state an will not
+	 * be counted here.
+	 *
+	 * @return The number of key/value pairs currently stored in the state.
+	 */
+	int size();
+
+	/**
+	 * Disposes the key/value state, releasing all occupied resources.
+	 */
+	void dispose();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
new file mode 100644
index 0000000..6aa7a1e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * This class represents a snapshot of the {@link KvState}, taken for a checkpoint. Where exactly
+ * the snapshot stores the snapshot data (in this object, in an external data store, etc) depends
+ * on the actual implementation. This snapshot defines merely how to restore the state and
+ * how to discard the state.
+ *
+ * <p>One possible implementation is that this snapshot simply contains a copy of the key/value map.
+ * 
+ * <p>Another possible implementation for this snapshot is that the key/value map is serialized into
+ * a file and this snapshot object contains a pointer to that file.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ * @param <Backend> The type of the backend that can restore the state from this snapshot.
+ */
+public interface KvStateSnapshot<K, V, Backend extends StateBackend<Backend>> extends java.io.Serializable {
+
+	/**
+	 * Loads the key/value state back from this snapshot.
+	 * 
+	 * 
+	 * @param stateBackend The state backend that created this snapshot and can restore the key/value state
+	 *                     from this snapshot.
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the values.
+	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.   
+	 * @param classLoader The class loader for user-defined types.
+	 * 
+	 * @return An instance of the key/value state loaded from this snapshot.
+	 * 
+	 * @throws Exception Exceptions can occur during the state loading and are forwarded. 
+	 */
+	KvState<K, V, Backend> restoreState(
+			Backend stateBackend,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<V> valueSerializer,
+			V defaultValue,
+			ClassLoader classLoader) throws Exception;
+
+
+	/**
+	 * Discards the state snapshot, removing any resources occupied by it.
+	 * 
+	 * @throws Exception Exceptions occurring during the state disposal should be forwarded.
+	 */
+	void discardState() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
deleted file mode 100644
index 0c0b2c9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import java.io.Serializable;
-
-import org.apache.flink.runtime.state.StateHandle;
-
-public class OperatorStateHandle implements StateHandle<Serializable> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private final StateHandle<Serializable> handle;
-	private final boolean isPartitioned;
-	
-	public OperatorStateHandle(StateHandle<Serializable> handle, boolean isPartitioned){
-		this.handle = handle;
-		this.isPartitioned = isPartitioned;
-	}
-	
-	public boolean isPartitioned(){
-		return isPartitioned;
-	}
-
-	@Override
-	public Serializable getState(ClassLoader userCodeClassLoader) throws Exception {
-		return handle.getState(userCodeClassLoader);
-	}
-
-	@Override
-	public void discardState() throws Exception {
-		handle.discardState();
-	}
-	
-	public StateHandle<Serializable> getHandle() {
-		return handle;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStateStore.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStateStore.java
deleted file mode 100644
index 34bfde7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStateStore.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.flink.api.common.state.StateCheckpointer;
-import org.apache.flink.runtime.state.StateHandle;
-
-/**
- * Interface for storing and accessing partitioned state. The interface is
- * designed in a way that allows implementations for lazily state access.
- * 
- * @param <S>
- *            Type of the state.
- * @param <C>
- *            Type of the state snapshot.
- */
-public interface PartitionedStateStore<S, C extends Serializable> {
-
-	S getStateForKey(Serializable key) throws IOException;
-
-	void setStateForKey(Serializable key, S state);
-	
-	void removeStateForKey(Serializable key);
-
-	Map<Serializable, S> getPartitionedState() throws IOException;
-
-	StateHandle<Serializable> snapshotStates(long checkpointId, long checkpointTimestamp) throws IOException;
-
-	void restoreStates(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader) throws Exception;
-
-	boolean containsKey(Serializable key);
-	
-	void setCheckPointer(StateCheckpointer<S, C> checkpointer);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
deleted file mode 100644
index 408a0f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.util.InstantiationUtil;
-
-/**
- * Implementation of the {@link OperatorState} interface for partitioned user
- * states. It provides methods for checkpointing and restoring partitioned
- * operator states upon failure.
- * 
- * @param <IN>
- *            Input type of the underlying {@link OneInputStreamOperator}
- * @param <S>
- *            Type of the underlying {@link OperatorState}.
- * @param <C>
- *            Type of the state snapshot.
- */
-public class PartitionedStreamOperatorState<IN, S, C extends Serializable> extends StreamOperatorState<S, C> {
-
-	// KeySelector for getting the state partition key for each input
-	private final KeySelector<IN, Serializable> keySelector;
-
-	private final PartitionedStateStore<S, C> stateStore;
-
-	private byte[] defaultState;
-
-	// The currently processed input, used to extract the appropriate key
-	private IN currentInput;
-
-	private ClassLoader cl;
-	private boolean restored = true;
-	private StateHandle<Serializable> checkpoint = null;
-
-	public PartitionedStreamOperatorState(StateCheckpointer<S, C> checkpointer,
-			StateHandleProvider<C> provider, KeySelector<IN, Serializable> keySelector, ClassLoader cl) {
-		super(checkpointer, provider);
-		this.keySelector = keySelector;
-		this.stateStore = new EagerStateStore<S, C>(checkpointer, provider);
-		this.cl = cl;
-	}
-
-	@SuppressWarnings("unchecked")
-	public PartitionedStreamOperatorState(StateHandleProvider<C> provider,
-			KeySelector<IN, Serializable> keySelector, ClassLoader cl) {
-		this((StateCheckpointer<S, C>) new BasicCheckpointer(), provider, keySelector, cl);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public S value() throws IOException {
-		if (currentInput == null) {
-			throw new IllegalStateException("Need a valid input for accessing the state.");
-		} else {
-			if (!restored) {
-				// If the state is not restored yet, restore now
-				restoreWithCheckpointer();
-			}
-			Serializable key;
-			try {
-				key = keySelector.getKey(currentInput);
-			} catch (Exception e) {
-				throw new RuntimeException("User-defined key selector threw an exception.", e);
-			}
-			if (stateStore.containsKey(key)) {
-				return stateStore.getStateForKey(key);
-			} else {
-				try {
-					return (S) checkpointer.restoreState((C) InstantiationUtil.deserializeObject(
-							defaultState, cl));
-				} catch (ClassNotFoundException e) {
-					throw new RuntimeException("Could not deserialize default state value.", e);
-				}
-			}
-		}
-	}
-
-	@Override
-	public void update(S state) throws IOException {
-		if (currentInput == null) {
-			throw new IllegalStateException("Need a valid input for updating a state.");
-		} else {
-			if (!restored) {
-				// If the state is not restored yet, restore now
-				restoreWithCheckpointer();
-			}
-			Serializable key;
-			try {
-				key = keySelector.getKey(currentInput);
-			} catch (Exception e) {
-				throw new RuntimeException("User-defined key selector threw an exception.");
-			}
-			
-			if (state == null) {
-				// Remove state if set to null
-				stateStore.removeStateForKey(key);
-			} else {
-				stateStore.setStateForKey(key, state);
-			}
-		}
-	}
-
-	@Override
-	public void setDefaultState(S defaultState) {
-		try {
-			this.defaultState = InstantiationUtil.serializeObject(checkpointer.snapshotState(defaultState, 0, 0));
-		} catch (IOException e) {
-			throw new RuntimeException("Default state must be serializable.");
-		}
-	}
-
-	public void setCurrentInput(IN input) {
-		currentInput = input;
-	}
-
-	@Override
-	public StateHandle<Serializable> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		// If the state is restored we take a snapshot, otherwise return the last checkpoint
-		return restored ? stateStore.snapshotStates(checkpointId, checkpointTimestamp) : provider
-				.createStateHandle(checkpoint.getState(cl));
-	}
-	
-	@Override
-	public void restoreState(StateHandle<Serializable> snapshot, ClassLoader userCodeClassLoader) throws Exception {
-		// We store the snapshot for lazy restore
-		checkpoint = snapshot;
-		restored = false;
-	}
-	
-	private void restoreWithCheckpointer() throws IOException {
-		try {
-			stateStore.restoreStates(checkpoint, cl);
-		} catch (Exception e) {
-			throw new IOException(e);
-		}
-		restored = true;
-		checkpoint = null;
-	}
-
-	@Override
-	public Map<Serializable, S> getPartitionedState() throws Exception {
-		return stateStore.getPartitionedState();
-	}
-	
-	@Override
-	public void setCheckpointer(StateCheckpointer<S, C> checkpointer) {
-		super.setCheckpointer(checkpointer);
-		stateStore.setCheckPointer(checkpointer);
-	}
-
-	@Override
-	public String toString() {
-		return stateStore.toString();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
new file mode 100644
index 0000000..b4fce7e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateHandle;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * A state backend defines how state is stored and snapshotted during checkpoints.
+ * 
+ * @param <Backend> The type of backend itself. This generic parameter is used to refer to the
+ *                  type of backend when creating state backed by this backend.
+ */
+public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable {
+	
+	private static final long serialVersionUID = 4620413814639220247L;
+	
+	// ------------------------------------------------------------------------
+	//  initialization and cleanup
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * This method is called by the task upon deployment to initialize the state backend for
+	 * data for a specific job.
+	 * 
+	 * @param job The ID of the job for which the state backend instance checkpoints data.
+	 * @throws Exception Overwritten versions of this method may throw exceptions, in which
+	 *                   case the job that uses the state backend is considered failed during
+	 *                   deployment.
+	 */
+	public abstract void initializeForJob(JobID job) throws Exception;
+
+	/**
+	 * Disposes all state associated with the current job.
+	 * 
+	 * @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
+	 */
+	public abstract void disposeAllStateForCurrentJob() throws Exception;
+	
+	// ------------------------------------------------------------------------
+	//  key/value state
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a key/value state backed by this state backend.
+	 * 
+	 * @param keySerializer The serializer for the key.
+	 * @param valueSerializer The serializer for the value.
+	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+	 * @param <K> The type of the key.
+	 * @param <V> The type of the value.
+	 * 
+	 * @return A new key/value state backed by this backend.
+	 * 
+	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
+	 */
+	public abstract <K, V> KvState<K, V, Backend> createKvState(
+			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
+			V defaultValue) throws Exception;
+	
+	
+	// ------------------------------------------------------------------------
+	//  storing state for a checkpoint
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates an output stream that writes into the state of the given checkpoint. When the stream
+	 * is closes, it returns a state handle that can retrieve the state back.
+	 * 
+	 * @param checkpointID The ID of the checkpoint.
+	 * @param timestamp The timestamp of the checkpoint.
+	 * @return An output stream that writes state for the given checkpoint.
+	 * 
+	 * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
+	 */
+	public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
+			long checkpointID, long timestamp) throws Exception;
+
+
+	/**
+	 * Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
+	 * 
+	 * @param state The state to be checkpointed.
+	 * @param checkpointID The ID of the checkpoint.
+	 * @param timestamp The timestamp of the checkpoint.
+	 * @param <S> The type of the state.
+	 * 
+	 * @return A state handle that can retrieve the checkpoined state.
+	 * 
+	 * @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
+	 */
+	public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
+			S state, long checkpointID, long timestamp) throws Exception;
+	
+	
+	// ------------------------------------------------------------------------
+	//  Checkpoint state output stream
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A dedicated output stream that produces a {@link StreamStateHandle} when closed.
+	 */
+	public static abstract class CheckpointStateOutputStream extends OutputStream {
+
+		/**
+		 * Closes the stream and gets a state handle that can create an input stream
+		 * producing the data written to this stream.
+		 * 
+		 * @return A state handle that can create an input stream producing the data written to this stream.
+		 * @throws IOException Thrown, if the stream cannot be closed.
+		 */
+		public abstract StreamStateHandle closeAndGetHandle() throws IOException;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
new file mode 100644
index 0000000..ad87eae
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * A factory to create a specific state backend. The state backend creation gets a Configuration
+ * object that can be used to read further config values.
+ * 
+ * @param <T> The type of the state backend created.
+ */
+public interface StateBackendFactory<T extends StateBackend<T>> {
+
+	/**
+	 * Creates the state backend, optionally using the given configuration.
+	 * 
+	 * @param config The Flink configuration (loaded by the TaskManager).
+	 * @return The created state backend. 
+	 * 
+	 * @throws Exception Exceptions during instantiation can be forwarded.
+	 */
+	StateBackend<T> createFromConfig(Configuration config) throws Exception;
+}