You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/30 12:47:57 UTC

[07/10] flink git commit: [FLINK-4379] [checkpoints] Introduce rescalable operator state

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index f5e3618..7e4eded 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint;
 
 import com.google.common.collect.Iterables;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.util.Preconditions;
@@ -47,27 +49,36 @@ public class TaskState implements StateObject {
 	/** handles to non-partitioned states, subtaskindex -> subtaskstate */
 	private final Map<Integer, SubtaskState> subtaskStates;
 
-	/** handles to partitioned states, subtaskindex -> keyed state */
+	/** handles to partitionable states, subtaskindex -> partitionable state */
+	private final Map<Integer, ChainedStateHandle<OperatorStateHandle>> partitionableStates;
+
+	/** handles to key-partitioned states, subtaskindex -> keyed state */
 	private final Map<Integer, KeyGroupsStateHandle> keyGroupsStateHandles;
 
+
 	/** parallelism of the operator when it was checkpointed */
 	private final int parallelism;
 
 	/** maximum parallelism of the operator when the job was first created */
 	private final int maxParallelism;
 
-	public TaskState(JobVertexID jobVertexID, int parallelism, int maxParallelism) {
+	private final int chainLength;
+
+	public TaskState(JobVertexID jobVertexID, int parallelism, int maxParallelism, int chainLength) {
 		Preconditions.checkArgument(
 				parallelism <= maxParallelism,
 				"Parallelism " + parallelism + " is not smaller or equal to max parallelism " + maxParallelism + ".");
+		Preconditions.checkArgument(chainLength > 0, "There has to be at least one operator in the operator chain.");
 
 		this.jobVertexID = jobVertexID;
 
 		this.subtaskStates = new HashMap<>(parallelism);
+		this.partitionableStates = new HashMap<>(parallelism);
 		this.keyGroupsStateHandles = new HashMap<>(parallelism);
 
 		this.parallelism = parallelism;
 		this.maxParallelism = maxParallelism;
+		this.chainLength = chainLength;
 	}
 
 	public JobVertexID getJobVertexID() {
@@ -85,6 +96,20 @@ public class TaskState implements StateObject {
 		}
 	}
 
+	public void putPartitionableState(
+			int subtaskIndex,
+			ChainedStateHandle<OperatorStateHandle> partitionableState) {
+
+		Preconditions.checkNotNull(partitionableState);
+
+		if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+			throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
+					" exceeds the maximum number of sub tasks " + subtaskStates.size());
+		} else {
+			partitionableStates.put(subtaskIndex, partitionableState);
+		}
+	}
+
 	public void putKeyedState(int subtaskIndex, KeyGroupsStateHandle keyGroupsStateHandle) {
 		Preconditions.checkNotNull(keyGroupsStateHandle);
 
@@ -106,6 +131,15 @@ public class TaskState implements StateObject {
 		}
 	}
 
+	public ChainedStateHandle<OperatorStateHandle> getPartitionableState(int subtaskIndex) {
+		if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+			throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
+					" exceeds the maximum number of sub tasks " + subtaskStates.size());
+		} else {
+			return partitionableStates.get(subtaskIndex);
+		}
+	}
+
 	public KeyGroupsStateHandle getKeyGroupState(int subtaskIndex) {
 		if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
 			throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
@@ -131,6 +165,10 @@ public class TaskState implements StateObject {
 		return maxParallelism;
 	}
 
+	public int getChainLength() {
+		return chainLength;
+	}
+
 	public Collection<KeyGroupsStateHandle> getKeyGroupStates() {
 		return keyGroupsStateHandles.values();
 	}
@@ -147,7 +185,7 @@ public class TaskState implements StateObject {
 	@Override
 	public void discardState() throws Exception {
 		StateUtil.bestEffortDiscardAllStateObjects(
-				Iterables.concat(subtaskStates.values(), keyGroupsStateHandles.values()));
+				Iterables.concat(subtaskStates.values(), partitionableStates.values(), keyGroupsStateHandles.values()));
 	}
 
 
@@ -156,11 +194,19 @@ public class TaskState implements StateObject {
 		long result = 0L;
 
 		for (int i = 0; i < parallelism; i++) {
-			if (subtaskStates.get(i) != null) {
-				result += subtaskStates.get(i).getStateSize();
+			SubtaskState subtaskState = subtaskStates.get(i);
+			if (subtaskState != null) {
+				result += subtaskState.getStateSize();
+			}
+
+			ChainedStateHandle<OperatorStateHandle> partitionableState = partitionableStates.get(i);
+			if (partitionableState != null) {
+				result += partitionableState.getStateSize();
 			}
-			if (keyGroupsStateHandles.get(i) != null) {
-				result += keyGroupsStateHandles.get(i).getStateSize();
+
+			KeyGroupsStateHandle keyGroupsState = keyGroupsStateHandles.get(i);
+			if (keyGroupsState != null) {
+				result += keyGroupsState.getStateSize();
 			}
 		}
 
@@ -172,8 +218,11 @@ public class TaskState implements StateObject {
 		if (obj instanceof TaskState) {
 			TaskState other = (TaskState) obj;
 
-			return jobVertexID.equals(other.jobVertexID) && parallelism == other.parallelism &&
-				subtaskStates.equals(other.subtaskStates) && keyGroupsStateHandles.equals(other.keyGroupsStateHandles);
+			return jobVertexID.equals(other.jobVertexID)
+					&& parallelism == other.parallelism
+					&& subtaskStates.equals(other.subtaskStates)
+					&& partitionableStates.equals(other.partitionableStates)
+					&& keyGroupsStateHandles.equals(other.keyGroupsStateHandles);
 		} else {
 			return false;
 		}
@@ -181,13 +230,7 @@ public class TaskState implements StateObject {
 
 	@Override
 	public int hashCode() {
-		return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, keyGroupsStateHandles);
-	}
-
-	@Override
-	public void close() throws IOException {
-		StateUtil.bestEffortCloseAllStateObjects(
-				Iterables.concat(subtaskStates.values(), keyGroupsStateHandles.values()));
+		return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, partitionableStates, keyGroupsStateHandles);
 	}
 
 	public Map<Integer, SubtaskState> getSubtaskStates() {
@@ -197,4 +240,8 @@ public class TaskState implements StateObject {
 	public Map<Integer, KeyGroupsStateHandle> getKeyGroupsStateHandles() {
 		return Collections.unmodifiableMap(keyGroupsStateHandles);
 	}
+
+	public Map<Integer, ChainedStateHandle<OperatorStateHandle>> getPartitionableStates() {
+		return partitionableStates;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index f07f44f..536062a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -35,6 +36,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -51,6 +53,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 	private static final byte BYTE_STREAM_STATE_HANDLE = 1;
 	private static final byte FILE_STREAM_STATE_HANDLE = 2;
 	private static final byte KEY_GROUPS_HANDLE = 3;
+	private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
 
 
 	public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer();
@@ -75,8 +78,9 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 				int parallelism = taskState.getParallelism();
 				dos.writeInt(parallelism);
 				dos.writeInt(taskState.getMaxParallelism());
+				dos.writeInt(taskState.getChainLength());
 
-				// Sub task states
+				// Sub task non-partitionable states
 				Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
 				dos.writeInt(subtaskStateMap.size());
 				for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
@@ -93,7 +97,22 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 					dos.writeLong(subtaskState.getDuration());
 				}
 
+				// Sub task partitionable states
+				Map<Integer, ChainedStateHandle<OperatorStateHandle>> partitionableStatesMap = taskState.getPartitionableStates();
+				dos.writeInt(partitionableStatesMap.size());
 
+				for (Map.Entry<Integer, ChainedStateHandle<OperatorStateHandle>> entry : partitionableStatesMap.entrySet()) {
+					dos.writeInt(entry.getKey());
+
+					ChainedStateHandle<OperatorStateHandle> chainedStateHandle = entry.getValue();
+					dos.writeInt(chainedStateHandle.getLength());
+					for (int j = 0; j < chainedStateHandle.getLength(); ++j) {
+						OperatorStateHandle stateHandle = chainedStateHandle.get(j);
+						serializePartitionableStateHandle(stateHandle, dos);
+					}
+				}
+
+				// Keyed state
 				Map<Integer, KeyGroupsStateHandle> keyGroupsStateHandles = taskState.getKeyGroupsStateHandles();
 				dos.writeInt(keyGroupsStateHandles.size());
 				for (Map.Entry<Integer, KeyGroupsStateHandle> entry : keyGroupsStateHandles.entrySet()) {
@@ -119,9 +138,10 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 			JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
 			int parallelism = dis.readInt();
 			int maxParallelism = dis.readInt();
+			int chainLength = dis.readInt();
 
 			// Add task state
-			TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism);
+			TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism, chainLength);
 			taskStates.add(taskState);
 
 			// Sub task states
@@ -142,6 +162,24 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 				taskState.putState(subtaskIndex, subtaskState);
 			}
 
+			int numPartitionableOpStates = dis.readInt();
+
+			for (int j = 0; j < numPartitionableOpStates; j++) {
+				int subtaskIndex = dis.readInt();
+				int chainedStateHandleSize = dis.readInt();
+				List<OperatorStateHandle> streamStateHandleList = new ArrayList<>(chainedStateHandleSize);
+
+				for (int k = 0; k < chainedStateHandleSize; ++k) {
+					OperatorStateHandle streamStateHandle = deserializePartitionableStateHandle(dis);
+					streamStateHandleList.add(streamStateHandle);
+				}
+
+				ChainedStateHandle<OperatorStateHandle> chainedStateHandle =
+						new ChainedStateHandle<>(streamStateHandleList);
+
+				taskState.putPartitionableState(subtaskIndex, chainedStateHandle);
+			}
+
 			// Key group states
 			int numKeyGroupStates = dis.readInt();
 			for (int j = 0; j < numKeyGroupStates; j++) {
@@ -157,7 +195,9 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 		return new SavepointV1(checkpointId, taskStates);
 	}
 
-	public static void serializeKeyGroupStateHandle(KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException {
+	public static void serializeKeyGroupStateHandle(
+			KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
 		if (stateHandle != null) {
 			dos.writeByte(KEY_GROUPS_HANDLE);
 			dos.writeInt(stateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup());
@@ -172,10 +212,10 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 	}
 
 	public static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
-		int type = dis.readByte();
+		final int type = dis.readByte();
 		if (NULL_HANDLE == type) {
 			return null;
-		} else {
+		} else if (KEY_GROUPS_HANDLE == type) {
 			int startKeyGroup = dis.readInt();
 			int numKeyGroups = dis.readInt();
 			KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
@@ -186,6 +226,53 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 			KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, offsets);
 			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
 			return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+		} else {
+			throw new IllegalStateException("Reading invalid KeyGroupsStateHandle, type: " + type);
+		}
+	}
+
+	public static void serializePartitionableStateHandle(
+			OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+		if (stateHandle != null) {
+			dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
+			Map<String, long[]> partitionOffsetsMap = stateHandle.getStateNameToPartitionOffsets();
+			dos.writeInt(partitionOffsetsMap.size());
+			for (Map.Entry<String, long[]> entry : partitionOffsetsMap.entrySet()) {
+				dos.writeUTF(entry.getKey());
+				long[] offsets = entry.getValue();
+				dos.writeInt(offsets.length);
+				for (int i = 0; i < offsets.length; ++i) {
+					dos.writeLong(offsets[i]);
+				}
+			}
+			serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
+		} else {
+			dos.writeByte(NULL_HANDLE);
+		}
+	}
+
+	public static OperatorStateHandle deserializePartitionableStateHandle(
+			DataInputStream dis) throws IOException {
+
+		final int type = dis.readByte();
+		if (NULL_HANDLE == type) {
+			return null;
+		} else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
+			int mapSize = dis.readInt();
+			Map<String, long[]> offsetsMap = new HashMap<>(mapSize);
+			for (int i = 0; i < mapSize; ++i) {
+				String key = dis.readUTF();
+				long[] offsets = new long[dis.readInt()];
+				for (int j = 0; j < offsets.length; ++j) {
+					offsets[j] = dis.readLong();
+				}
+				offsetsMap.put(key, offsets);
+			}
+			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
+			return new OperatorStateHandle(stateHandle, offsetsMap);
+		} else {
+			throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index ca976e4..7bbdb2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.SerializedValue;
 
@@ -100,6 +101,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	/** Handle to the key-grouped state of the head operator in the chain */
 	private final List<KeyGroupsStateHandle> keyGroupState;
 
+	private final List<Collection<OperatorStateHandle>> partitionableOperatorState;
+
 	/** The execution configuration (see {@link ExecutionConfig}) related to the specific job. */
 	private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
@@ -107,26 +110,27 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	 * Constructs a task deployment descriptor.
 	 */
 	public TaskDeploymentDescriptor(
-			JobID jobID,
-			String jobName,
-			JobVertexID vertexID,
-			ExecutionAttemptID executionId,
-			SerializedValue<ExecutionConfig> serializedExecutionConfig,
-			String taskName,
-			int numberOfKeyGroups,
-			int indexInSubtaskGroup,
-			int numberOfSubtasks,
-			int attemptNumber,
-			Configuration jobConfiguration,
-			Configuration taskConfiguration,
-			String invokableClassName,
-			List<ResultPartitionDeploymentDescriptor> producedPartitions,
-			List<InputGateDeploymentDescriptor> inputGates,
-			List<BlobKey> requiredJarFiles,
-			List<URL> requiredClasspaths,
-			int targetSlotNumber,
-			ChainedStateHandle<StreamStateHandle> operatorState,
-			List<KeyGroupsStateHandle> keyGroupState) {
+		JobID jobID,
+		String jobName,
+		JobVertexID vertexID,
+		ExecutionAttemptID executionId,
+		SerializedValue<ExecutionConfig> serializedExecutionConfig,
+		String taskName,
+		int numberOfKeyGroups,
+		int indexInSubtaskGroup,
+		int numberOfSubtasks,
+		int attemptNumber,
+		Configuration jobConfiguration,
+		Configuration taskConfiguration,
+		String invokableClassName,
+		List<ResultPartitionDeploymentDescriptor> producedPartitions,
+		List<InputGateDeploymentDescriptor> inputGates,
+		List<BlobKey> requiredJarFiles,
+		List<URL> requiredClasspaths,
+		int targetSlotNumber,
+		ChainedStateHandle<StreamStateHandle> operatorState,
+		List<KeyGroupsStateHandle> keyGroupState,
+		List<Collection<OperatorStateHandle>> partitionableOperatorStateHandles) {
 
 		checkArgument(indexInSubtaskGroup >= 0);
 		checkArgument(numberOfSubtasks > indexInSubtaskGroup);
@@ -153,6 +157,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		this.targetSlotNumber = targetSlotNumber;
 		this.operatorState = operatorState;
 		this.keyGroupState = keyGroupState;
+		this.partitionableOperatorState = partitionableOperatorStateHandles;
 	}
 
 	public TaskDeploymentDescriptor(
@@ -195,6 +200,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			requiredClasspaths,
 			targetSlotNumber,
 			null,
+			null,
 			null);
 	}
 
@@ -347,4 +353,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	public List<KeyGroupsStateHandle> getKeyGroupState() {
 		return keyGroupState;
 	}
+
+	public List<Collection<OperatorStateHandle>> getPartitionableOperatorState() {
+		return partitionableOperatorState;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 273c0d9..f6cde95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -34,13 +34,10 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -187,12 +184,8 @@ public interface Environment {
 	 * the checkpoint with the give checkpoint-ID. This method does include
 	 * the given state in the checkpoint.
 	 *
-	 * @param checkpointId
-	 *             The ID of the checkpoint.
-	 * @param chainedStateHandle
-	 *             Handle for the chained operator state
-	 * @param keyGroupStateHandles
-	 *             Handles for key group state
+	 * @param checkpointId The ID of the checkpoint.
+	 * @param checkpointStateHandles All state handles for the checkpointed state
 	 * @param synchronousDurationMillis
 	 *             The duration (in milliseconds) of the synchronous part of the operator checkpoint
 	 * @param asynchronousDurationMillis
@@ -204,8 +197,7 @@ public interface Environment {
 	 */
 	void acknowledgeCheckpoint(
 			long checkpointId,
-			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			List<KeyGroupsStateHandle> keyGroupStateHandles,
+			CheckpointStateHandles checkpointStateHandles,
 			long synchronousDurationMillis,
 			long asynchronousDurationMillis,
 			long bytesBufferedInAlignment,

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 912ff10..b92e3af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -46,6 +47,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -56,6 +58,7 @@ import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -134,6 +137,8 @@ public class Execution {
 
 	private ChainedStateHandle<StreamStateHandle> chainedStateHandle;
 
+	private List<Collection<OperatorStateHandle>> chainedPartitionableStateHandle;
+
 	private List<KeyGroupsStateHandle> keyGroupsStateHandles;
 	
 
@@ -223,6 +228,10 @@ public class Execution {
 		return keyGroupsStateHandles;
 	}
 
+	public List<Collection<OperatorStateHandle>> getChainedPartitionableStateHandle() {
+		return chainedPartitionableStateHandle;
+	}
+
 	public boolean isFinished() {
 		return state.isTerminal();
 	}
@@ -246,18 +255,19 @@ public class Execution {
 	 * Sets the initial state for the execution. The serialized state is then shipped via the
 	 * {@link TaskDeploymentDescriptor} to the TaskManagers.
 	 *
-	 * @param chainedStateHandle Chained operator state
-	 * @param keyGroupsStateHandles Key-group state (= partitioned state)
+	 * @param checkpointStateHandles all checkpointed operator state
 	 */
-	public void setInitialState(
-		ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			List<KeyGroupsStateHandle> keyGroupsStateHandles) {
+	public void setInitialState(CheckpointStateHandles checkpointStateHandles, List<Collection<OperatorStateHandle>> chainedPartitionableStateHandle) {
 
 		if (state != ExecutionState.CREATED) {
 			throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
 		}
-		this.chainedStateHandle = chainedStateHandle;
-		this.keyGroupsStateHandles = keyGroupsStateHandles;
+
+		if(checkpointStateHandles != null) {
+			this.chainedStateHandle = checkpointStateHandles.getNonPartitionedStateHandles();
+			this.chainedPartitionableStateHandle = chainedPartitionableStateHandle;
+			this.keyGroupsStateHandles = checkpointStateHandles.getKeyGroupsStateHandle();
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -385,6 +395,7 @@ public class Execution {
 				slot,
 				chainedStateHandle,
 				keyGroupsStateHandles,
+				chainedPartitionableStateHandle,
 				attemptNumber);
 
 			// register this execution at the execution graph, to receive call backs

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 7c3fa0b..6023205 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -56,10 +56,8 @@ import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a8d5ee4..4837803 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -53,6 +54,7 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -629,6 +631,7 @@ public class ExecutionVertex {
 			SimpleSlot targetSlot,
 			ChainedStateHandle<StreamStateHandle> operatorState,
 			List<KeyGroupsStateHandle> keyGroupStates,
+			List<Collection<OperatorStateHandle>> partitionableOperatorStateHandle,
 			int attemptNumber) {
 
 		// Produced intermediate results
@@ -681,7 +684,8 @@ public class ExecutionVertex {
 			classpaths,
 			targetSlot.getRoot().getSlotNumber(),
 			operatorState,
-			keyGroupStates);
+			keyGroupStates,
+			partitionableOperatorStateHandle);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 9ddfdf7..55e3e09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -20,8 +20,10 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
+import java.util.Collection;
 import java.util.List;
 
 /**
@@ -33,11 +35,16 @@ public interface StatefulTask {
 	/**
 	 * Sets the initial state of the operator, upon recovery. The initial state is typically
 	 * a snapshot of the state from a previous execution.
-	 * 
+	 *
+	 * TODO this should use @{@link org.apache.flink.runtime.state.CheckpointStateHandles} after redoing chained state.
+	 *
 	 * @param chainedState Handle for the chained operator states.
 	 * @param keyGroupsState Handle for key group states.
 	 */
-	void setInitialState(ChainedStateHandle<StreamStateHandle> chainedState, List<KeyGroupsStateHandle> keyGroupsState) throws Exception;
+	void setInitialState(
+		ChainedStateHandle<StreamStateHandle> chainedState,
+		List<KeyGroupsStateHandle> keyGroupsState,
+		List<Collection<OperatorStateHandle>> partitionableOperatorState) throws Exception;
 
 	/**
 	 * This method is called to trigger a checkpoint, asynchronously by the checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index 72396eb..e95e7b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -20,11 +20,7 @@ package org.apache.flink.runtime.messages.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-
-import java.util.List;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -32,7 +28,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
  * {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the checkpoint of an
  * individual task is completed.
- * 
+ * <p>
  * <p>This message may carry the handle to the task's chained operator state and the key group
  * state.
  */
@@ -40,9 +36,8 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 
 	private static final long serialVersionUID = -7606214777192401493L;
 
-	private final ChainedStateHandle<StreamStateHandle> stateHandle;
 
-	private final List<KeyGroupsStateHandle> keyGroupsStateHandle;
+	private final CheckpointStateHandles checkpointStateHandles;
 
 	/** The duration (in milliseconds) that the synchronous part of the checkpoint took */
 	private final long synchronousDurationMillis;
@@ -62,24 +57,22 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 			JobID job,
 			ExecutionAttemptID taskExecutionId,
 			long checkpointId) {
-		this(job, taskExecutionId, checkpointId, null, null);
+		this(job, taskExecutionId, checkpointId, null);
 	}
 
 	public AcknowledgeCheckpoint(
 			JobID job,
 			ExecutionAttemptID taskExecutionId,
 			long checkpointId,
-			ChainedStateHandle<StreamStateHandle> state,
-			List<KeyGroupsStateHandle> keyGroupStateAndSizes) {
-		this(job, taskExecutionId, checkpointId, state, keyGroupStateAndSizes, -1L, -1L, -1L, -1L);
+			CheckpointStateHandles checkpointStateHandles) {
+		this(job, taskExecutionId, checkpointId, checkpointStateHandles, -1L, -1L, -1L, -1L);
 	}
 
 	public AcknowledgeCheckpoint(
 			JobID job,
 			ExecutionAttemptID taskExecutionId,
 			long checkpointId,
-			ChainedStateHandle<StreamStateHandle> state,
-			List<KeyGroupsStateHandle> keyGroupStateAndSizes,
+			CheckpointStateHandles checkpointStateHandles,
 			long synchronousDurationMillis,
 			long asynchronousDurationMillis,
 			long bytesBufferedInAlignment,
@@ -87,9 +80,7 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 
 		super(job, taskExecutionId, checkpointId);
 
-		// these may be null in cases where the operator has no state
-		this.stateHandle = state;
-		this.keyGroupsStateHandle = keyGroupStateAndSizes;
+		this.checkpointStateHandles = checkpointStateHandles;
 
 		// these may be "-1", in case the values are unknown or not set
 		checkArgument(synchronousDurationMillis >= -1);
@@ -107,12 +98,8 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 	//  properties
 	// ------------------------------------------------------------------------
 
-	public ChainedStateHandle<StreamStateHandle> getStateHandle() {
-		return stateHandle;
-	}
-
-	public List<KeyGroupsStateHandle> getKeyGroupsStateHandle() {
-		return keyGroupsStateHandle;
+	public CheckpointStateHandles getCheckpointStateHandles() {
+		return checkpointStateHandles;
 	}
 
 	public long getSynchronousDurationMillis() {
@@ -134,31 +121,33 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public int hashCode() {
-		return super.hashCode();
-	}
-
-	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
-			return true ;
+			return true;
 		}
-		else if (o instanceof AcknowledgeCheckpoint) {
-			AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o;
-			return super.equals(o) &&
-					(this.stateHandle == null ? that.stateHandle == null : 
-							(that.stateHandle != null && this.stateHandle.equals(that.stateHandle))) &&
-					(this.keyGroupsStateHandle == null ? that.keyGroupsStateHandle == null : 
-							(that.keyGroupsStateHandle != null && this.keyGroupsStateHandle.equals(that.keyGroupsStateHandle)));
+		if (!(o instanceof AcknowledgeCheckpoint)) {
+			return false;
 		}
-		else {
+		if (!super.equals(o)) {
 			return false;
 		}
+
+		AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o;
+		return checkpointStateHandles != null ?
+				checkpointStateHandles.equals(that.checkpointStateHandles) : that.checkpointStateHandles == null;
+
+	}
+
+	@Override
+	public int hashCode() {
+		int result = super.hashCode();
+		result = 31 * result + (checkpointStateHandles != null ? checkpointStateHandles.hashCode() : 0);
+		return result;
 	}
 
 	@Override
 	public String toString() {
-		return String.format("Confirm Task Checkpoint %d for (%s/%s) - state=%s keyGroupState=%s",
-				getCheckpointId(), getJob(), getTaskExecutionId(), stateHandle, keyGroupsStateHandle);
+		return String.format("Confirm Task Checkpoint %d for (%s/%s) - state=%s",
+				getCheckpointId(), getJob(), getTaskExecutionId(), checkpointStateHandles);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
deleted file mode 100644
index 5966c95..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
-/**
- * A simple base for closable handles.
- *
- * Offers to register a stream (or other closable object) that close calls are delegated to if
- * the handle is closed or was already closed.
- */
-public abstract class AbstractCloseableHandle implements Closeable, StateObject {
-
-	/** Serial Version UID must be constant to maintain format compatibility */
-	private static final long serialVersionUID = 1L;
-
-	/** To atomically update the "closable" field without needing to add a member class like "AtomicBoolean */
-	private static final AtomicIntegerFieldUpdater<AbstractCloseableHandle> CLOSER =
-			AtomicIntegerFieldUpdater.newUpdater(AbstractCloseableHandle.class, "isClosed");
-
-	// ------------------------------------------------------------------------
-
-	/** The closeable to close if this handle is closed late */
-	private transient volatile Closeable toClose;
-
-	/** Flag to remember if this handle was already closed */
-	@SuppressWarnings("unused") // this field is actually updated, but via the "CLOSER" updater
-	private transient volatile int isClosed;
-
-	// ------------------------------------------------------------------------
-
-	protected final void registerCloseable(Closeable toClose) throws IOException {
-		if (toClose == null) {
-			return;
-		}
-
-		// NOTE: The order of operations matters here:
-		// (1) first setting the closeable
-		// (2) checking the flag.
-		// Because the order in the {@link #close()} method is the opposite, and
-		// both variables are volatile (reordering barriers), we can be sure that
-		// one of the methods always notices the effect of a concurrent call to the
-		// other method.
-
-		this.toClose = toClose;
-
-		// check if we were closed early
-		if (this.isClosed != 0) {
-			toClose.close();
-			throw new IOException("handle is closed");
-		}
-	}
-
-	/**
-	 * Closes the handle.
-	 *
-	 * <p>If a "Closeable" has been registered via {@link #registerCloseable(Closeable)},
-	 * then this will be closes.
-	 *
-	 * <p>If any "Closeable" will be registered via {@link #registerCloseable(Closeable)} in the future,
-	 * it will immediately be closed and that method will throw an exception.
-	 *
-	 * @throws IOException Exceptions occurring while closing an already registered {@code Closeable}
-	 *                     are forwarded.
-	 *
-	 * @see #registerCloseable(Closeable)
-	 */
-	@Override
-	public final void close() throws IOException {
-		// NOTE: The order of operations matters here:
-		// (1) first setting the closed flag
-		// (2) checking whether there is already a closeable
-		// Because the order in the {@link #registerCloseable(Closeable)} method is the opposite, and
-		// both variables are volatile (reordering barriers), we can be sure that
-		// one of the methods always notices the effect of a concurrent call to the
-		// other method.
-
-		if (CLOSER.compareAndSet(this, 0, 1)) {
-			final Closeable toClose = this.toClose;
-			if (toClose != null) {
-				this.toClose = null;
-				toClose.close();
-			}
-		}
-	}
-
-	/**
-	 * Checks whether this handle has been closed.
-	 *
-	 * @return True is the handle is closed, false otherwise.
-	 */
-	public boolean isClosed() {
-		return isClosed != 0;
-	}
-
-	/**
-	 * This method checks whether the handle is closed and throws an exception if it is closed.
-	 * If the handle is not closed, this method does nothing.
-	 *
-	 * @throws IOException Thrown, if the handle has been closed.
-	 */
-	public void ensureNotClosed() throws IOException {
-		if (isClosed != 0) {
-			throw new IOException("handle is closed");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
new file mode 100644
index 0000000..7ca3b38
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Base implementation of KeyedStateBackend. The state can be checkpointed
+ * to streams using {@link #snapshot(long, long, CheckpointStreamFactory)}.
+ *
+ * @param <K> Type of the key by which state is keyed.
+ */
+public abstract class AbstractKeyedStateBackend<K>
+		implements KeyedStateBackend<K>, SnapshotProvider<KeyGroupsStateHandle>, Closeable {
+
+	/** {@link TypeSerializer} for our key. */
+	protected final TypeSerializer<K> keySerializer;
+
+	/** The currently active key. */
+	protected K currentKey;
+
+	/** The key group of the currently active key */
+	private int currentKeyGroup;
+
+	/** So that we can give out state when the user uses the same key. */
+	protected HashMap<String, KvState<?>> keyValueStatesByName;
+
+	/** For caching the last accessed partitioned state */
+	private String lastName;
+
+	@SuppressWarnings("rawtypes")
+	private KvState lastState;
+
+	/** The number of key-groups aka max parallelism */
+	protected final int numberOfKeyGroups;
+
+	/** Range of key-groups for which this backend is responsible */
+	protected final KeyGroupRange keyGroupRange;
+
+	/** KvStateRegistry helper for this task */
+	protected final TaskKvStateRegistry kvStateRegistry;
+
+	/** Registry for all opened streams, so they can be closed if the task using this backend is closed */
+	protected ClosableRegistry cancelStreamRegistry;
+
+	protected final ClassLoader userCodeClassLoader;
+
+	public AbstractKeyedStateBackend(
+			TaskKvStateRegistry kvStateRegistry,
+			TypeSerializer<K> keySerializer,
+			ClassLoader userCodeClassLoader,
+			int numberOfKeyGroups,
+			KeyGroupRange keyGroupRange) {
+
+		this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
+		this.keySerializer = Preconditions.checkNotNull(keySerializer);
+		this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
+		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
+		this.cancelStreamRegistry = new ClosableRegistry();
+	}
+
+	/**
+	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
+	 * checkpoint data.
+	 *
+	 */
+	@Override
+	public void dispose() {
+		if (kvStateRegistry != null) {
+			kvStateRegistry.unregisterAll();
+		}
+
+		lastName = null;
+		lastState = null;
+		keyValueStatesByName = null;
+	}
+
+	/**
+	 * Creates and returns a new {@link ValueState}.
+	 *
+	 * @param namespaceSerializer TypeSerializer for the state namespace.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <N> The type of the namespace.
+	 * @param <T> The type of the value that the {@code ValueState} can store.
+	 */
+	protected abstract <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link ListState}.
+	 *
+	 * @param namespaceSerializer TypeSerializer for the state namespace.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <N> The type of the namespace.
+	 * @param <T> The type of the values that the {@code ListState} can store.
+	 */
+	protected abstract <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link ReducingState}.
+	 *
+	 * @param namespaceSerializer TypeSerializer for the state namespace.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <N> The type of the namespace.
+	 * @param <T> The type of the values that the {@code ListState} can store.
+	 */
+	protected abstract <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link FoldingState}.
+	 *
+	 * @param namespaceSerializer TypeSerializer for the state namespace.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <N> The type of the namespace.
+	 * @param <T> Type of the values folded into the state
+	 * @param <ACC> Type of the value in the state	 *
+	 */
+	protected abstract <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
+
+	/**
+	 * @see KeyedStateBackend
+	 */
+	@Override
+	public void setCurrentKey(K newKey) {
+		this.currentKey = newKey;
+		this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups);
+	}
+
+	/**
+	 * @see KeyedStateBackend
+	 */
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	/**
+	 * @see KeyedStateBackend
+	 */
+	@Override
+	public K getCurrentKey() {
+		return currentKey;
+	}
+
+	/**
+	 * @see KeyedStateBackend
+	 */
+	@Override
+	public int getCurrentKeyGroupIndex() {
+		return currentKeyGroup;
+	}
+
+	/**
+	 * @see KeyedStateBackend
+	 */
+	@Override
+	public int getNumberOfKeyGroups() {
+		return numberOfKeyGroups;
+	}
+
+	/**
+	 * @see KeyedStateBackend
+	 */
+	public KeyGroupRange getKeyGroupRange() {
+		return keyGroupRange;
+	}
+
+	/**
+	 * @see KeyedStateBackend
+	 */
+	@Override
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public <N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
+		Preconditions.checkNotNull(namespace, "Namespace");
+		Preconditions.checkNotNull(namespaceSerializer, "Namespace serializer");
+
+		if (keySerializer == null) {
+			throw new RuntimeException("State key serializer has not been configured in the config. " +
+					"This operation cannot use partitioned state.");
+		}
+		
+		if (!stateDescriptor.isSerializerInitialized()) {
+			stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		if (keyValueStatesByName == null) {
+			keyValueStatesByName = new HashMap<>();
+		}
+
+		if (lastName != null && lastName.equals(stateDescriptor.getName())) {
+			lastState.setCurrentNamespace(namespace);
+			return (S) lastState;
+		}
+
+		KvState<?> previous = keyValueStatesByName.get(stateDescriptor.getName());
+		if (previous != null) {
+			lastState = previous;
+			lastState.setCurrentNamespace(namespace);
+			lastName = stateDescriptor.getName();
+			return (S) previous;
+		}
+
+		// create a new blank key/value state
+		S state = stateDescriptor.bind(new StateBackend() {
+			@Override
+			public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
+				return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
+			}
+
+			@Override
+			public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
+				return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
+			}
+
+			@Override
+			public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
+				return AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
+			}
+
+			@Override
+			public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+				return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
+			}
+
+		});
+
+		KvState kvState = (KvState) state;
+
+		keyValueStatesByName.put(stateDescriptor.getName(), kvState);
+
+		lastName = stateDescriptor.getName();
+		lastState = kvState;
+
+		kvState.setCurrentNamespace(namespace);
+
+		// Publish queryable state
+		if (stateDescriptor.isQueryable()) {
+			if (kvStateRegistry == null) {
+				throw new IllegalStateException("State backend has not been initialized for job.");
+			}
+
+			String name = stateDescriptor.getQueryableStateName();
+			kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
+		}
+
+		return state;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked,rawtypes")
+	public <N, S extends MergingState<?, ?>> void mergePartitionedStates(final N target, Collection<N> sources, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
+		if (stateDescriptor instanceof ReducingStateDescriptor) {
+			ReducingStateDescriptor reducingStateDescriptor = (ReducingStateDescriptor) stateDescriptor;
+			ReduceFunction reduceFn = reducingStateDescriptor.getReduceFunction();
+			ReducingState state = (ReducingState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
+			KvState kvState = (KvState) state;
+			Object result = null;
+			for (N source: sources) {
+				kvState.setCurrentNamespace(source);
+				Object sourceValue = state.get();
+				if (result == null) {
+					result = state.get();
+				} else if (sourceValue != null) {
+					result = reduceFn.reduce(result, sourceValue);
+				}
+				state.clear();
+			}
+			kvState.setCurrentNamespace(target);
+			if (result != null) {
+				state.add(result);
+			}
+		} else if (stateDescriptor instanceof ListStateDescriptor) {
+			ListState<Object> state = (ListState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
+			KvState kvState = (KvState) state;
+			List<Object> result = new ArrayList<>();
+			for (N source: sources) {
+				kvState.setCurrentNamespace(source);
+				Iterable<Object> sourceValue = state.get();
+				if (sourceValue != null) {
+					for (Object o : sourceValue) {
+						result.add(o);
+					}
+				}
+				state.clear();
+			}
+			kvState.setCurrentNamespace(target);
+			for (Object o : result) {
+				state.add(o);
+			}
+		} else {
+			throw new RuntimeException("Cannot merge states for " + stateDescriptor);
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		cancelStreamRegistry.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 0d2bf45..c2e665b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 
 /**
@@ -36,31 +37,33 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 	 * Creates a {@link CheckpointStreamFactory} that can be used to create streams
 	 * that should end up in a checkpoint.
 	 *
-	 * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams.
+	 * @param jobId              The {@link JobID} of the job for which we are creating checkpoint streams.
 	 * @param operatorIdentifier An identifier of the operator for which we create streams.
 	 */
 	public abstract CheckpointStreamFactory createStreamFactory(
 			JobID jobId,
-			String operatorIdentifier) throws IOException;
+			String operatorIdentifier
+	) throws IOException;
 
 	/**
-	 * Creates a new {@link KeyedStateBackend} that is responsible for keeping keyed state
+	 * Creates a new {@link AbstractKeyedStateBackend} that is responsible for keeping keyed state
 	 * and can be checkpointed to checkpoint streams.
 	 */
-	public abstract <K> KeyedStateBackend<K> createKeyedStateBackend(
+	public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,
 			JobID jobID,
 			String operatorIdentifier,
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) throws Exception;
+			TaskKvStateRegistry kvStateRegistry
+	) throws Exception;
 
 	/**
-	 * Creates a new {@link KeyedStateBackend} that restores its state from the given list
+	 * Creates a new {@link AbstractKeyedStateBackend} that restores its state from the given list
 	 * {@link KeyGroupsStateHandle KeyGroupStateHandles}.
 	 */
-	public abstract <K> KeyedStateBackend<K> restoreKeyedStateBackend(
+	public abstract <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend(
 			Environment env,
 			JobID jobID,
 			String operatorIdentifier,
@@ -68,6 +71,30 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			List<KeyGroupsStateHandle> restoredState,
-			TaskKvStateRegistry kvStateRegistry) throws Exception;
+			TaskKvStateRegistry kvStateRegistry
+	) throws Exception;
 
+
+	/**
+	 * Creates a new {@link OperatorStateBackend} that can be used for storing partitionable operator
+	 * state in checkpoint streams.
+	 */
+	public OperatorStateBackend createOperatorStateBackend(
+			Environment env,
+			String operatorIdentifier
+	) throws Exception {
+		return new DefaultOperatorStateBackend();
+	}
+
+	/**
+	 * Creates a new {@link OperatorStateBackend} that restores its state from the given collection of
+	 * {@link OperatorStateHandle}.
+	 */
+	public OperatorStateBackend restoreOperatorStateBackend(
+			Environment env,
+			String operatorIdentifier,
+			Collection<OperatorStateHandle> restoreSnapshots
+	) throws Exception {
+		return new DefaultOperatorStateBackend(restoreSnapshots);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
index 74057ee..c6904c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
@@ -26,7 +26,7 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * Handle to the non-partitioned states for the operators in an operator chain.
+ * Handle to state handles for the operators in an operator chain.
  */
 public class ChainedStateHandle<T extends StateObject> implements StateObject {
 
@@ -123,9 +123,4 @@ public class ChainedStateHandle<T extends StateObject> implements StateObject {
 	public static <T extends StateObject> ChainedStateHandle<T> wrapSingleHandle(T stateHandleToWrap) {
 		return new ChainedStateHandle<T>(Collections.singletonList(stateHandleToWrap));
 	}
-
-	@Override
-	public void close() throws IOException {
-		StateUtil.bestEffortCloseAllStateObjects(operatorStateHandles);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java
new file mode 100644
index 0000000..9daf963
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Container state handles that contains all state handles from the different state types of a checkpointed state.
+ * TODO This will be changed in the future if we get rid of chained state and instead connect state directly to individual operators in a chain.
+ */
+public class CheckpointStateHandles implements Serializable {
+
+	private static final long serialVersionUID = 3252351989995L;
+
+	private final ChainedStateHandle<StreamStateHandle> nonPartitionedStateHandles;
+
+	private final ChainedStateHandle<OperatorStateHandle> partitioneableStateHandles;
+
+	private final List<KeyGroupsStateHandle> keyGroupsStateHandle;
+
+	public CheckpointStateHandles(
+			ChainedStateHandle<StreamStateHandle> nonPartitionedStateHandles,
+			ChainedStateHandle<OperatorStateHandle> partitioneableStateHandles,
+			List<KeyGroupsStateHandle> keyGroupsStateHandle) {
+
+		this.nonPartitionedStateHandles = nonPartitionedStateHandles;
+		this.partitioneableStateHandles = partitioneableStateHandles;
+		this.keyGroupsStateHandle = keyGroupsStateHandle;
+	}
+
+	public ChainedStateHandle<StreamStateHandle> getNonPartitionedStateHandles() {
+		return nonPartitionedStateHandles;
+	}
+
+	public ChainedStateHandle<OperatorStateHandle> getPartitioneableStateHandles() {
+		return partitioneableStateHandles;
+	}
+
+	public List<KeyGroupsStateHandle> getKeyGroupsStateHandle() {
+		return keyGroupsStateHandle;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (!(o instanceof CheckpointStateHandles)) {
+			return false;
+		}
+
+		CheckpointStateHandles that = (CheckpointStateHandles) o;
+
+		if (nonPartitionedStateHandles != null ?
+				!nonPartitionedStateHandles.equals(that.nonPartitionedStateHandles)
+				: that.nonPartitionedStateHandles != null) {
+			return false;
+		}
+
+		if (partitioneableStateHandles != null ?
+				!partitioneableStateHandles.equals(that.partitioneableStateHandles)
+				: that.partitioneableStateHandles != null) {
+			return false;
+		}
+		return keyGroupsStateHandle != null ?
+				keyGroupsStateHandle.equals(that.keyGroupsStateHandle) : that.keyGroupsStateHandle == null;
+
+	}
+
+	@Override
+	public int hashCode() {
+		int result = nonPartitionedStateHandles != null ? nonPartitionedStateHandles.hashCode() : 0;
+		result = 31 * result + (partitioneableStateHandles != null ? partitioneableStateHandles.hashCode() : 0);
+		result = 31 * result + (keyGroupsStateHandle != null ? keyGroupsStateHandle.hashCode() : 0);
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "CheckpointStateHandles{" +
+				"nonPartitionedStateHandles=" + nonPartitionedStateHandles +
+				", partitioneableStateHandles=" + partitioneableStateHandles +
+				", keyGroupsStateHandle=" + keyGroupsStateHandle +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
new file mode 100644
index 0000000..26d6192
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ClosableRegistry implements Closeable {
+
+	private final Set<Closeable> registeredCloseables;
+	private boolean closed;
+
+	public ClosableRegistry() {
+		this.registeredCloseables = new HashSet<>();
+		this.closed = false;
+	}
+
+	public boolean registerClosable(Closeable closeable) {
+
+		if (null == closeable) {
+			return false;
+		}
+
+		synchronized (getSynchronizationLock()) {
+			if (closed) {
+				throw new IllegalStateException("Cannot register Closable, registry is already closed.");
+			}
+
+			return registeredCloseables.add(closeable);
+		}
+	}
+
+	public boolean unregisterClosable(Closeable closeable) {
+
+		if (null == closeable) {
+			return false;
+		}
+
+		synchronized (getSynchronizationLock()) {
+			return registeredCloseables.remove(closeable);
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+
+		if (!registeredCloseables.isEmpty()) {
+
+			synchronized (getSynchronizationLock()) {
+
+				for (Closeable closeable : registeredCloseables) {
+					IOUtils.closeQuietly(closeable);
+				}
+
+				registeredCloseables.clear();
+				closed = true;
+			}
+		}
+	}
+
+	private Object getSynchronizationLock() {
+		return registeredCloseables;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
new file mode 100644
index 0000000..0bd5eeb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Default implementation of OperatorStateStore that provides the ability to make snapshots.
+ */
+public class DefaultOperatorStateBackend implements OperatorStateBackend {
+
+	private final Map<String, PartitionableListState<?>> registeredStates;
+	private final Collection<OperatorStateHandle> restoreSnapshots;
+	private final ClosableRegistry closeStreamOnCancelRegistry;
+
+	/**
+	 * Restores a OperatorStateStore (lazily) using the provided snapshots.
+	 *
+	 * @param restoreSnapshots snapshots that are available to restore partitionable states on request.
+	 */
+	public DefaultOperatorStateBackend(
+			Collection<OperatorStateHandle> restoreSnapshots) {
+		this.restoreSnapshots = restoreSnapshots;
+		this.registeredStates = new HashMap<>();
+		this.closeStreamOnCancelRegistry = new ClosableRegistry();
+	}
+
+	/**
+	 * Creates an empty OperatorStateStore.
+	 */
+	public DefaultOperatorStateBackend() {
+		this(null);
+	}
+
+	/**
+	 * @see OperatorStateStore
+	 */
+	@Override
+	public <S> ListState<S> getPartitionableState(
+			ListStateDescriptor<S> stateDescriptor) throws IOException {
+
+		Preconditions.checkNotNull(stateDescriptor);
+
+		String name = Preconditions.checkNotNull(stateDescriptor.getName());
+		TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getSerializer());
+
+		@SuppressWarnings("unchecked")
+		PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name);
+
+		if (null == partitionableListState) {
+
+			partitionableListState = new PartitionableListState<>(partitionStateSerializer);
+
+			registeredStates.put(name, partitionableListState);
+
+			// Try to restore previous state if state handles to snapshots are provided
+			if (restoreSnapshots != null) {
+				for (OperatorStateHandle stateHandle : restoreSnapshots) {
+
+					long[] offsets = stateHandle.getStateNameToPartitionOffsets().get(name);
+
+					if (offsets != null) {
+
+						FSDataInputStream in = stateHandle.openInputStream();
+						try {
+							closeStreamOnCancelRegistry.registerClosable(in);
+
+							DataInputView div = new DataInputViewStreamWrapper(in);
+
+							for (int i = 0; i < offsets.length; ++i) {
+
+								in.seek(offsets[i]);
+								S partitionState = partitionStateSerializer.deserialize(div);
+								partitionableListState.add(partitionState);
+							}
+						} finally {
+							closeStreamOnCancelRegistry.unregisterClosable(in);
+							in.close();
+						}
+					}
+				}
+			}
+		}
+
+		return partitionableListState;
+	}
+
+	/**
+	 * @see SnapshotProvider
+	 */
+	@Override
+	public RunnableFuture<OperatorStateHandle> snapshot(
+			long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
+
+		if (registeredStates.isEmpty()) {
+			return new DoneFuture<>(null);
+		}
+
+		Map<String, long[]> writtenStatesMetaData = new HashMap<>(registeredStates.size());
+
+		CheckpointStreamFactory.CheckpointStateOutputStream out = streamFactory.
+				createCheckpointStateOutputStream(checkpointId, timestamp);
+
+		try {
+			closeStreamOnCancelRegistry.registerClosable(out);
+
+			DataOutputView dov = new DataOutputViewStreamWrapper(out);
+
+			dov.writeInt(registeredStates.size());
+			for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) {
+
+				long[] partitionOffsets = entry.getValue().write(out);
+				writtenStatesMetaData.put(entry.getKey(), partitionOffsets);
+			}
+
+			OperatorStateHandle handle = new OperatorStateHandle(out.closeAndGetHandle(), writtenStatesMetaData);
+
+			return new DoneFuture<>(handle);
+		} finally {
+			closeStreamOnCancelRegistry.unregisterClosable(out);
+			out.close();
+		}
+	}
+
+	@Override
+	public void dispose() {
+
+	}
+
+	static final class PartitionableListState<S> implements ListState<S> {
+
+		private final List<S> listState;
+		private final TypeSerializer<S> partitionStateSerializer;
+
+		public PartitionableListState(TypeSerializer<S> partitionStateSerializer) {
+			this.listState = new ArrayList<>();
+			this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
+		}
+
+		@Override
+		public void clear() {
+			listState.clear();
+		}
+
+		@Override
+		public Iterable<S> get() {
+			return listState;
+		}
+
+		@Override
+		public void add(S value) {
+			listState.add(value);
+		}
+
+		public long[] write(FSDataOutputStream out) throws IOException {
+
+			long[] partitionOffsets = new long[listState.size()];
+
+			DataOutputView dov = new DataOutputViewStreamWrapper(out);
+
+			for (int i = 0; i < listState.size(); ++i) {
+				S element = listState.get(i);
+				partitionOffsets[i] = out.getPos();
+				partitionStateSerializer.serialize(element, dov);
+			}
+
+			return partitionOffsets;
+		}
+	}
+
+	@Override
+	public Set<String> getRegisteredStateNames() {
+		return registeredStates.keySet();
+	}
+
+	@Override
+	public void close() throws IOException {
+		closeStreamOnCancelRegistry.close();
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
index 4f0a82b..8e7207e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
@@ -31,6 +31,8 @@ import java.util.Iterator;
  */
 public class KeyGroupRangeOffsets implements Iterable<Tuple2<Integer, Long>> , Serializable {
 
+	private static final long serialVersionUID = 6595415219136429696L;
+
 	/** the range of key-groups */
 	private final KeyGroupRange keyGroupRange;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 7f87e86..ea12808 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -138,7 +138,6 @@ public class KeyGroupsStateHandle implements StateObject {
 			return false;
 		}
 		return stateHandle.equals(that.stateHandle);
-
 	}
 
 	@Override
@@ -155,9 +154,4 @@ public class KeyGroupsStateHandle implements StateObject {
 				", data=" + stateHandle +
 				'}';
 	}
-
-	@Override
-	public void close() throws IOException {
-		stateHandle.close();
-	}
 }