You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:37 UTC

[19/27] flink git commit: [FLINK-4381] Refactor State to Prepare For Key-Group State Backends

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index ac4503d..9025090 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -18,46 +18,56 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import com.google.common.collect.Iterables;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
 
-import java.io.Serializable;
+import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 
 /**
- * Simple container class which contains the task state and key-value state handles for the sub
+ * Simple container class which contains the task state and key-group state handles for the sub
  * tasks of a {@link org.apache.flink.runtime.jobgraph.JobVertex}.
  *
- * This class basically groups all tasks and key groups belonging to the same job vertex together.
+ * This class basically groups all non-partitioned state and key-group state belonging to the same job vertex together.
  */
-public class TaskState implements Serializable {
+public class TaskState implements StateObject {
 
 	private static final long serialVersionUID = -4845578005863201810L;
 
 	private final JobVertexID jobVertexID;
 
-	/** Map of task states which can be accessed by their sub task index */
+	/** handles to non-partitioned states, subtaskindex -> subtaskstate */
 	private final Map<Integer, SubtaskState> subtaskStates;
 
-	/** Map of key-value states which can be accessed by their key group index */
-	private final Map<Integer, KeyGroupState> kvStates;
+	/** handles to partitioned states, subtaskindex -> keyed state */
+	private final Map<Integer, KeyGroupsStateHandle> keyGroupsStateHandles;
 
-	/** Parallelism of the operator when it was checkpointed */
+	/** parallelism of the operator when it was checkpointed */
 	private final int parallelism;
 
-	public TaskState(JobVertexID jobVertexID, int parallelism) {
-		this.jobVertexID = jobVertexID;
+	/** maximum parallelism of the operator when the job was first created */
+	private final int maxParallelism;
 
-		this.subtaskStates = new HashMap<>(parallelism);
+	public TaskState(JobVertexID jobVertexID, int parallelism, int maxParallelism) {
+		Preconditions.checkArgument(
+				parallelism <= maxParallelism,
+				"Parallelism " + parallelism + " is not smaller or equal to max parallelism " + maxParallelism + ".");
 
-		this.kvStates = new HashMap<>();
+		this.jobVertexID = jobVertexID;
+		//preallocate lists of the required size, so that we can randomly set values to indexes
+		this.subtaskStates = new HashMap<>(parallelism);
+		this.keyGroupsStateHandles = new HashMap<>(parallelism);
 
 		this.parallelism = parallelism;
+		this.maxParallelism = maxParallelism;
 	}
 
 	public JobVertexID getJobVertexID() {
@@ -65,6 +75,8 @@ public class TaskState implements Serializable {
 	}
 
 	public void putState(int subtaskIndex, SubtaskState subtaskState) {
+		Preconditions.checkNotNull(subtaskState);
+
 		if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
 			throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
 				" exceeds the maximum number of sub tasks " + subtaskStates.size());
@@ -73,31 +85,38 @@ public class TaskState implements Serializable {
 		}
 	}
 
-	public SubtaskState getState(int subtaskIndex) {
+	public void putKeyedState(int subtaskIndex, KeyGroupsStateHandle keyGroupsStateHandle) {
+		Preconditions.checkNotNull(keyGroupsStateHandle);
+
 		if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
 			throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
-				" exceeds the maximum number of sub tasks " + subtaskStates.size());
+					" exceeds the maximum number of sub tasks " + subtaskStates.size());
 		} else {
-			return subtaskStates.get(subtaskIndex);
+			keyGroupsStateHandles.put(subtaskIndex, keyGroupsStateHandle);
 		}
 	}
 
-	public Collection<SubtaskState> getStates() {
-		return subtaskStates.values();
-	}
-
-	public long getStateSize() {
-		long result = 0L;
 
-		for (SubtaskState subtaskState : subtaskStates.values()) {
-			result += subtaskState.getStateSize();
+	public SubtaskState getState(int subtaskIndex) {
+		if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+			throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
+				" exceeds the maximum number of sub tasks " + subtaskStates.size());
+		} else {
+			return subtaskStates.get(subtaskIndex);
 		}
+	}
 
-		for (KeyGroupState keyGroupState : kvStates.values()) {
-			result += keyGroupState.getStateSize();
+	public KeyGroupsStateHandle getKeyGroupState(int subtaskIndex) {
+		if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+			throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
+					" exceeds the maximum number of sub tasks " + keyGroupsStateHandles.size());
+		} else {
+			return keyGroupsStateHandles.get(subtaskIndex);
 		}
+	}
 
-		return result;
+	public Collection<SubtaskState> getStates() {
+		return subtaskStates.values();
 	}
 
 	public int getNumberCollectedStates() {
@@ -108,48 +127,44 @@ public class TaskState implements Serializable {
 		return parallelism;
 	}
 
-	public void putKvState(int keyGroupId, KeyGroupState keyGroupState) {
-		kvStates.put(keyGroupId, keyGroupState);
+	public int getMaxParallelism() {
+		return maxParallelism;
 	}
 
-	public KeyGroupState getKvState(int keyGroupId) {
-		return kvStates.get(keyGroupId);
+	public Collection<KeyGroupsStateHandle> getKeyGroupStates() {
+		return keyGroupsStateHandles.values();
 	}
 
-	/**
-	 * Retrieve the set of key-value state key groups specified by the given key group partition set.
-	 * The key groups are returned as a map where the key group index maps to the serialized state
-	 * handle of the key group.
-	 *
-	 * @param keyGroupPartition Set of key group indices
-	 * @return Map of serialized key group state handles indexed by their key group index.
-	 */
-	public Map<Integer, SerializedValue<StateHandle<?>>> getUnwrappedKvStates(Set<Integer> keyGroupPartition) {
-		HashMap<Integer, SerializedValue<StateHandle<?>>> result = new HashMap<>(keyGroupPartition.size());
-
-		for (Integer keyGroupId : keyGroupPartition) {
-			KeyGroupState keyGroupState = kvStates.get(keyGroupId);
-
-			if (keyGroupState != null) {
-				result.put(keyGroupId, kvStates.get(keyGroupId).getKeyGroupState());
+	public boolean hasNonPartitionedState() {
+		for(SubtaskState sts : subtaskStates.values()) {
+			if (sts != null && !sts.getChainedStateHandle().isEmpty()) {
+				return true;
 			}
 		}
-
-		return result;
+		return false;
 	}
 
-	public int getNumberCollectedKvStates() {
-		return kvStates.size();
+	@Override
+	public void discardState() throws Exception {
+		StateUtil.bestEffortDiscardAllStateObjects(
+				Iterables.concat(subtaskStates.values(), keyGroupsStateHandles.values()));
 	}
 
-	public void discard(ClassLoader classLoader) throws Exception {
-		for (SubtaskState subtaskState : subtaskStates.values()) {
-			subtaskState.discard(classLoader);
-		}
 
-		for (KeyGroupState keyGroupState : kvStates.values()) {
-			keyGroupState.discard(classLoader);
+	@Override
+	public long getStateSize() throws Exception {
+		long result = 0L;
+
+		for (int i = 0; i < parallelism; i++) {
+			if (subtaskStates.get(i) != null) {
+				result += subtaskStates.get(i).getStateSize();
+			}
+			if (keyGroupsStateHandles.get(i) != null) {
+				result += keyGroupsStateHandles.get(i).getStateSize();
+			}
 		}
+
+		return result;
 	}
 
 	@Override
@@ -158,7 +173,7 @@ public class TaskState implements Serializable {
 			TaskState other = (TaskState) obj;
 
 			return jobVertexID.equals(other.jobVertexID) && parallelism == other.parallelism &&
-				subtaskStates.equals(other.subtaskStates) && kvStates.equals(other.kvStates);
+				subtaskStates.equals(other.subtaskStates) && keyGroupsStateHandles.equals(other.keyGroupsStateHandles);
 		} else {
 			return false;
 		}
@@ -166,6 +181,20 @@ public class TaskState implements Serializable {
 
 	@Override
 	public int hashCode() {
-		return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, kvStates);
+		return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, keyGroupsStateHandles);
+	}
+
+	@Override
+	public void close() throws IOException {
+		StateUtil.bestEffortCloseAllStateObjects(
+				Iterables.concat(subtaskStates.values(), keyGroupsStateHandles.values()));
+	}
+
+	public Map<Integer, SubtaskState> getSubtaskStates() {
+		return Collections.unmodifiableMap(subtaskStates);
+	}
+
+	public Map<Integer, KeyGroupsStateHandle> getKeyGroupsStateHandles() {
+		return Collections.unmodifiableMap(keyGroupsStateHandles);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 376ef70..b826d9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -24,9 +24,9 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,7 +79,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	private final ClassLoader userClassLoader;
 
 	/** Local completed checkpoints. */
-	private final ArrayDeque<Tuple2<StateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
+	private final ArrayDeque<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
 
 	/**
 	 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
@@ -101,7 +101,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			ClassLoader userClassLoader,
 			CuratorFramework client,
 			String checkpointsPath,
-			StateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception {
+			RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception {
 
 		checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
 		checkNotNull(stateStorage, "State storage");
@@ -143,7 +143,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		checkpointStateHandles.clear();
 
 		// Get all there is first
-		List<Tuple2<StateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
+		List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
 		while (true) {
 			try {
 				initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName();
@@ -161,10 +161,10 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		if (numberOfInitialCheckpoints > 0) {
 			// Take the last one. This is the latest checkpoints, because path names are strictly
 			// increasing (checkpoint ID).
-			Tuple2<StateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
+			Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
 					.get(numberOfInitialCheckpoints - 1);
 
-			CompletedCheckpoint latestCheckpoint = latest.f0.getState(userClassLoader);
+			CompletedCheckpoint latestCheckpoint = latest.f0.retrieveState();
 
 			checkpointStateHandles.add(latest);
 
@@ -193,7 +193,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		// First add the new one. If it fails, we don't want to loose existing data.
 		String path = String.format("/%s", checkpoint.getCheckpointID());
 
-		final StateHandle<CompletedCheckpoint> stateHandle = checkpointsInZooKeeper.add(path, checkpoint);
+		final RetrievableStateHandle<CompletedCheckpoint> stateHandle =
+				checkpointsInZooKeeper.add(path, checkpoint);
 
 		checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));
 
@@ -211,7 +212,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			return null;
 		}
 		else {
-			return checkpointStateHandles.getLast().f0.getState(userClassLoader);
+			return checkpointStateHandles.getLast().f0.retrieveState();
 		}
 	}
 
@@ -219,8 +220,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
 		List<CompletedCheckpoint> checkpoints = new ArrayList<>(checkpointStateHandles.size());
 
-		for (Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandle : checkpointStateHandles) {
-			checkpoints.add(stateHandle.f0.getState(userClassLoader));
+		for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandle : checkpointStateHandles) {
+			checkpoints.add(stateHandle.f0.retrieveState());
 		}
 
 		return checkpoints;
@@ -235,7 +236,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	public void shutdown() throws Exception {
 		LOG.info("Shutting down");
 
-		for (Tuple2<StateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
+		for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
 			try {
 				removeFromZooKeeperAndDiscardCheckpoint(checkpoint);
 			}
@@ -264,7 +265,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 * Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle.
 	 */
 	private void removeFromZooKeeperAndDiscardCheckpoint(
-			final Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
+			final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
 
 		final BackgroundCallback callback = new BackgroundCallback() {
 			@Override
@@ -273,16 +274,15 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 					if (event.getType() == CuratorEventType.DELETE) {
 						if (event.getResultCode() == 0) {
 							// The checkpoint
-							CompletedCheckpoint checkpoint = stateHandleAndPath
-									.f0.getState(userClassLoader);
-
-							checkpoint.discard(userClassLoader);
-
-							// Discard the state handle
-							stateHandleAndPath.f0.discardState();
-
-							// Discard the checkpoint
-							LOG.debug("Discarded " + checkpoint);
+							try {
+								CompletedCheckpoint checkpoint = stateHandleAndPath.f0.retrieveState();
+								checkpoint.discardState();
+								// Discard the checkpoint
+								LOG.debug("Discarded " + checkpoint);
+							} finally {
+								// Discard the state handle
+								stateHandleAndPath.f0.discardState();
+							}
 						}
 						else {
 							throw new IllegalStateException("Unexpected result code " +

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
index 6efc01e..49f51be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
@@ -142,13 +142,13 @@ public class FsSavepointStore implements SavepointStore {
 	}
 
 	@Override
-	public void disposeSavepoint(String path, ClassLoader classLoader) throws Exception {
+	public void disposeSavepoint(String path) throws Exception {
 		Preconditions.checkNotNull(path, "Path");
-		Preconditions.checkNotNull(classLoader, "Class loader");
 
 		try {
 			Savepoint savepoint = loadSavepoint(path);
-			savepoint.dispose(classLoader);
+			LOG.info("Disposing savepoint: " + path);
+			savepoint.dispose();
 
 			Path filePath = new Path(path);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
index cf30f5f..2cf8f31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
@@ -108,9 +108,8 @@ public class HeapSavepointStore implements SavepointStore {
 	}
 
 	@Override
-	public void disposeSavepoint(String path, ClassLoader classLoader) throws Exception {
+	public void disposeSavepoint(String path) throws Exception {
 		Preconditions.checkNotNull(path, "Path");
-		Preconditions.checkNotNull(classLoader, "Class loader");
 
 		Savepoint savepoint;
 		synchronized (shutDownLock) {
@@ -118,7 +117,7 @@ public class HeapSavepointStore implements SavepointStore {
 		}
 
 		if (savepoint != null) {
-			savepoint.dispose(classLoader);
+			savepoint.dispose();
 		} else {
 			throw new IllegalArgumentException("Invalid path '" + path + "'.");
 		}
@@ -131,7 +130,7 @@ public class HeapSavepointStore implements SavepointStore {
 			// available at this point.
 			for (Savepoint savepoint : savepoints.values()) {
 				try {
-					savepoint.dispose(ClassLoader.getSystemClassLoader());
+					savepoint.dispose();
 				} catch (Throwable t) {
 					LOG.warn("Failed to dispose savepoint " + savepoint.getCheckpointId(), t);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index 7823ab9..643f14c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -68,13 +68,7 @@ public interface Savepoint {
 
 	/**
 	 * Disposes the savepoint.
-	 *
-	 * <p>The class loader is needed, because savepoints can currently point to
-	 * arbitrary snapshot {@link org.apache.flink.runtime.state.StateHandle}
-	 * instances, which need the user code class loader for deserialization.
-	 *
-	 * @param classLoader Class loader for disposal
 	 */
-	void dispose(ClassLoader classLoader) throws Exception;
+	void dispose() throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 1be7a58..47917b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -60,12 +60,12 @@ public class SavepointLoader {
 			ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID());
 
 			if (executionJobVertex != null) {
-				if (executionJobVertex.getParallelism() == taskState.getParallelism()) {
+				if (executionJobVertex.getMaxParallelism() == taskState.getMaxParallelism()) {
 					taskStates.put(taskState.getJobVertexID(), taskState);
 				}
 				else {
 					String msg = String.format("Failed to rollback to savepoint %s. " +
-									"Parallelism mismatch between savepoint state and new program. " +
+									"Max parallelism mismatch between savepoint state and new program. " +
 									"Cannot map operator %s with parallelism %d to new program with " +
 									"parallelism %d. This indicates that the program has been changed " +
 									"in a non-compatible way after the savepoint.",

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index d06f3d0..20b3d89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -28,10 +28,13 @@ import java.util.Map;
  */
 public class SavepointSerializers {
 
+
+	private static final int SAVEPOINT_VERSION_0 = 0;
 	private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(1);
 
 	static {
-		SERIALIZERS.put(SavepointV0.VERSION, SavepointV0Serializer.INSTANCE);
+		SERIALIZERS.put(SAVEPOINT_VERSION_0, null);
+		SERIALIZERS.put(SavepointV1.VERSION, SavepointV1Serializer.INSTANCE);
 	}
 
 	/**
@@ -66,7 +69,7 @@ public class SavepointSerializers {
 		if (serializer != null) {
 			return serializer;
 		} else {
-			throw new IllegalArgumentException("Unknown savepoint version " + version + ".");
+			throw new IllegalArgumentException("Cannot restore savepoint version " + version + ".");
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 71fcb34..68b88d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -48,15 +48,10 @@ public interface SavepointStore {
 	/**
 	 * Disposes the savepoint at the specified path.
 	 *
-	 * <p>The class loader is needed, because savepoints can currently point to
-	 * arbitrary snapshot {@link org.apache.flink.runtime.state.StateHandle}
-	 * instances, which need the user code class loader for deserialization.
-	 *
 	 * @param path        Path of savepoint to dispose
-	 * @param classLoader Class loader for disposal
 	 * @throws Exception Failures during diposal are forwarded
 	 */
-	void disposeSavepoint(String path, ClassLoader classLoader) throws Exception;
+	void disposeSavepoint(String path) throws Exception;
 
 	/**
 	 * Shut downs the savepoint store.

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
deleted file mode 100644
index d60d80e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.util.Preconditions;
-
-import java.util.ArrayList;
-import java.util.Collection;
-
-/**
- * Savepoint version 0.
- *
- * <p>This format was introduced with Flink 1.1.0.
- */
-public class SavepointV0 implements Savepoint {
-
-	/** The savepoint version. */
-	public static final int VERSION = 0;
-
-	/** The checkpoint ID */
-	private final long checkpointId;
-
-	/** The task states */
-	private final Collection<TaskState> taskStates = new ArrayList();
-
-	public SavepointV0(long checkpointId, Collection<TaskState> taskStates) {
-		this.checkpointId = checkpointId;
-		this.taskStates.addAll(taskStates);
-	}
-
-	@Override
-	public int getVersion() {
-		return VERSION;
-	}
-
-	@Override
-	public long getCheckpointId() {
-		return checkpointId;
-	}
-
-	@Override
-	public Collection<TaskState> getTaskStates() {
-		return taskStates;
-	}
-
-	@Override
-	public void dispose(ClassLoader classLoader) throws Exception {
-		Preconditions.checkNotNull(classLoader, "Class loader");
-		for (TaskState taskState : taskStates) {
-			taskState.discard(classLoader);
-		}
-		taskStates.clear();
-	}
-
-	@Override
-	public String toString() {
-		return "Savepoint(version=" + VERSION + ")";
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		SavepointV0 that = (SavepointV0) o;
-		return checkpointId == that.checkpointId && getTaskStates().equals(that.getTaskStates());
-	}
-
-	@Override
-	public int hashCode() {
-		int result = (int) (checkpointId ^ (checkpointId >>> 32));
-		result = 31 * result + taskStates.hashCode();
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java
deleted file mode 100644
index e82b85f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.checkpoint.KeyGroupState;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Serializer for {@link SavepointV0} instances.
- *
- * <p>In contrast to previous savepoint versions, this serializer makes sure
- * that no default Java serialization is used for serialization. Therefore, we
- * don't rely on any involved Java classes to stay the same.
- */
-class SavepointV0Serializer implements SavepointSerializer<SavepointV0> {
-
-	public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer();
-
-	private SavepointV0Serializer() {
-	}
-
-	@Override
-	public void serialize(SavepointV0 savepoint, DataOutputStream dos) throws IOException {
-		dos.writeLong(savepoint.getCheckpointId());
-
-		Collection<TaskState> taskStates = savepoint.getTaskStates();
-		dos.writeInt(taskStates.size());
-
-		for (TaskState taskState : savepoint.getTaskStates()) {
-			// Vertex ID
-			dos.writeLong(taskState.getJobVertexID().getLowerPart());
-			dos.writeLong(taskState.getJobVertexID().getUpperPart());
-
-			// Parallelism
-			int parallelism = taskState.getParallelism();
-			dos.writeInt(parallelism);
-
-			// Sub task states
-			dos.writeInt(taskState.getNumberCollectedStates());
-
-			for (int i = 0; i < parallelism; i++) {
-				SubtaskState subtaskState = taskState.getState(i);
-
-				if (subtaskState != null) {
-					dos.writeInt(i);
-
-					SerializedValue<?> serializedValue = subtaskState.getState();
-					if (serializedValue == null) {
-						dos.writeInt(-1); // null
-					} else {
-						byte[] serialized = serializedValue.getByteArray();
-						dos.writeInt(serialized.length);
-						dos.write(serialized, 0, serialized.length);
-					}
-
-					dos.writeLong(subtaskState.getStateSize());
-					dos.writeLong(subtaskState.getDuration());
-				}
-			}
-
-			// Key group states
-			dos.writeInt(taskState.getNumberCollectedKvStates());
-
-			for (int i = 0; i < parallelism; i++) {
-				KeyGroupState keyGroupState = taskState.getKvState(i);
-
-				if (keyGroupState != null) {
-					dos.write(i);
-
-					SerializedValue<?> serializedValue = keyGroupState.getKeyGroupState();
-					if (serializedValue == null) {
-						dos.writeInt(-1); // null
-					} else {
-						byte[] serialized = serializedValue.getByteArray();
-						dos.writeInt(serialized.length);
-						dos.write(serialized, 0, serialized.length);
-					}
-
-					dos.writeLong(keyGroupState.getStateSize());
-					dos.writeLong(keyGroupState.getDuration());
-				}
-			}
-		}
-	}
-
-	@Override
-	public SavepointV0 deserialize(DataInputStream dis) throws IOException {
-		long checkpointId = dis.readLong();
-
-		// Task states
-		int numTaskStates = dis.readInt();
-		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
-		for (int i = 0; i < numTaskStates; i++) {
-			JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
-			int parallelism = dis.readInt();
-
-			// Add task state
-			TaskState taskState = new TaskState(jobVertexId, parallelism);
-			taskStates.add(taskState);
-
-			// Sub task states
-			int numSubTaskStates = dis.readInt();
-			for (int j = 0; j < numSubTaskStates; j++) {
-				int subtaskIndex = dis.readInt();
-
-				int length = dis.readInt();
-
-				SerializedValue<StateHandle<?>> serializedValue;
-				if (length == -1) {
-					serializedValue = new SerializedValue<>(null);
-				} else {
-					byte[] serializedData = new byte[length];
-					dis.readFully(serializedData, 0, length);
-					serializedValue = SerializedValue.fromBytes(serializedData);
-				}
-
-				long stateSize = dis.readLong();
-				long duration = dis.readLong();
-
-				SubtaskState subtaskState = new SubtaskState(
-						serializedValue,
-						stateSize,
-						duration);
-
-				taskState.putState(subtaskIndex, subtaskState);
-			}
-
-			// Key group states
-			int numKvStates = dis.readInt();
-			for (int j = 0; j < numKvStates; j++) {
-				int keyGroupIndex = dis.readInt();
-
-				int length = dis.readInt();
-
-				SerializedValue<StateHandle<?>> serializedValue;
-				if (length == -1) {
-					serializedValue = new SerializedValue<>(null);
-				} else {
-					byte[] serializedData = new byte[length];
-					dis.readFully(serializedData, 0, length);
-					serializedValue = SerializedValue.fromBytes(serializedData);
-				}
-
-				long stateSize = dis.readLong();
-				long duration = dis.readLong();
-
-				KeyGroupState keyGroupState = new KeyGroupState(
-						serializedValue,
-						stateSize,
-						duration);
-
-				taskState.putKvState(keyGroupIndex, keyGroupState);
-			}
-		}
-
-		return new SavepointV0(checkpointId, taskStates);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
new file mode 100644
index 0000000..5976bbf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+/**
+ * Savepoint version 0.
+ *
+ * <p>This format was introduced with Flink 1.1.0.
+ */
+public class SavepointV1 implements Savepoint {
+
+	/** The savepoint version. */
+	public static final int VERSION = 1;
+
+	/** The checkpoint ID */
+	private final long checkpointId;
+
+	/** The task states */
+	private final Collection<TaskState> taskStates;
+
+	public SavepointV1(long checkpointId, Collection<TaskState> taskStates) {
+		this.checkpointId = checkpointId;
+		this.taskStates = Preconditions.checkNotNull(taskStates, "Task States");
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	@Override
+	public Collection<TaskState> getTaskStates() {
+		return taskStates;
+	}
+
+	@Override
+	public void dispose() throws Exception {
+		for (TaskState taskState : taskStates) {
+			taskState.discardState();
+		}
+		taskStates.clear();
+	}
+
+	@Override
+	public String toString() {
+		return "Savepoint(version=" + VERSION + ")";
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		SavepointV1 that = (SavepointV1) o;
+		return checkpointId == that.checkpointId && getTaskStates().equals(that.getTaskStates());
+	}
+
+	@Override
+	public int hashCode() {
+		int result = (int) (checkpointId ^ (checkpointId >>> 32));
+		result = 31 * result + taskStates.hashCode();
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
new file mode 100644
index 0000000..8e05b81
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serializer for {@link SavepointV1} instances.
+ * <p>
+ * <p>In contrast to previous savepoint versions, this serializer makes sure
+ * that no default Java serialization is used for serialization. Therefore, we
+ * don't rely on any involved Java classes to stay the same.
+ */
+class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
+
+	private static final byte NULL_HANDLE = 0;
+	private static final byte BYTE_STREAM_STATE_HANDLE = 1;
+	private static final byte FILE_STREAM_STATE_HANDLE = 2;
+	private static final byte KEY_GROUPS_HANDLE = 3;
+
+
+	public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer();
+
+	private SavepointV1Serializer() {
+	}
+
+	@Override
+	public void serialize(SavepointV1 savepoint, DataOutputStream dos) throws IOException {
+		try {
+			dos.writeLong(savepoint.getCheckpointId());
+
+			Collection<TaskState> taskStates = savepoint.getTaskStates();
+			dos.writeInt(taskStates.size());
+
+			for (TaskState taskState : savepoint.getTaskStates()) {
+				// Vertex ID
+				dos.writeLong(taskState.getJobVertexID().getLowerPart());
+				dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+				// Parallelism
+				int parallelism = taskState.getParallelism();
+				dos.writeInt(parallelism);
+				dos.writeInt(taskState.getMaxParallelism());
+
+				// Sub task states
+				Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
+				dos.writeInt(subtaskStateMap.size());
+				for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
+					dos.writeInt(entry.getKey());
+
+					SubtaskState subtaskState = entry.getValue();
+					ChainedStateHandle<StreamStateHandle> chainedStateHandle = subtaskState.getChainedStateHandle();
+					dos.writeInt(chainedStateHandle.getLength());
+					for (int j = 0; j < chainedStateHandle.getLength(); ++j) {
+						StreamStateHandle stateHandle = chainedStateHandle.get(j);
+						serializeStreamStateHandle(stateHandle, dos);
+					}
+
+					dos.writeLong(subtaskState.getDuration());
+				}
+
+
+				Map<Integer, KeyGroupsStateHandle> keyGroupsStateHandles = taskState.getKeyGroupsStateHandles();
+				dos.writeInt(keyGroupsStateHandles.size());
+				for (Map.Entry<Integer, KeyGroupsStateHandle> entry : keyGroupsStateHandles.entrySet()) {
+					dos.writeInt(entry.getKey());
+					serializeKeyGroupStateHandle(entry.getValue(), dos);
+				}
+
+			}
+		} catch (Exception e) {
+			throw new IOException(e);
+		}
+	}
+
+	@Override
+	public SavepointV1 deserialize(DataInputStream dis) throws IOException {
+		long checkpointId = dis.readLong();
+
+		// Task states
+		int numTaskStates = dis.readInt();
+		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+		for (int i = 0; i < numTaskStates; i++) {
+			JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
+			int parallelism = dis.readInt();
+			int maxParallelism = dis.readInt();
+
+			// Add task state
+			TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism);
+			taskStates.add(taskState);
+
+			// Sub task states
+			int numSubTaskStates = dis.readInt();
+
+			for (int j = 0; j < numSubTaskStates; j++) {
+				int subtaskIndex = dis.readInt();
+				int chainedStateHandleSize = dis.readInt();
+				List<StreamStateHandle> streamStateHandleList = new ArrayList<>(chainedStateHandleSize);
+				for (int k = 0; k < chainedStateHandleSize; ++k) {
+					StreamStateHandle streamStateHandle = deserializeStreamStateHandle(dis);
+					streamStateHandleList.add(streamStateHandle);
+				}
+
+				long duration = dis.readLong();
+				ChainedStateHandle<StreamStateHandle> chainedStateHandle = new ChainedStateHandle<>(streamStateHandleList);
+				SubtaskState subtaskState = new SubtaskState(chainedStateHandle, duration);
+				taskState.putState(subtaskIndex, subtaskState);
+			}
+
+			// Key group states
+			int numKeyGroupStates = dis.readInt();
+			for (int j = 0; j < numKeyGroupStates; j++) {
+				int keyGroupIndex = dis.readInt();
+
+				KeyGroupsStateHandle keyGroupsStateHandle = deserializeKeyGroupStateHandle(dis);
+				if (keyGroupsStateHandle != null) {
+					taskState.putKeyedState(keyGroupIndex, keyGroupsStateHandle);
+				}
+			}
+		}
+
+		return new SavepointV1(checkpointId, taskStates);
+	}
+
+	public static void serializeKeyGroupStateHandle(KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException {
+		if (stateHandle != null) {
+			dos.writeByte(KEY_GROUPS_HANDLE);
+			dos.writeInt(stateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup());
+			dos.writeInt(stateHandle.getNumberOfKeyGroups());
+			for (int keyGroup : stateHandle.keyGroups()) {
+				dos.writeLong(stateHandle.getOffsetForKeyGroup(keyGroup));
+			}
+			serializeStreamStateHandle(stateHandle.getStateHandle(), dos);
+		} else {
+			dos.writeByte(NULL_HANDLE);
+		}
+	}
+
+	public static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
+		int type = dis.readByte();
+		if (NULL_HANDLE == type) {
+			return null;
+		} else {
+			int startKeyGroup = dis.readInt();
+			int numKeyGroups = dis.readInt();
+			KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
+			long[] offsets = new long[numKeyGroups];
+			for (int i = 0; i < numKeyGroups; ++i) {
+				offsets[i] = dis.readLong();
+			}
+			KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, offsets);
+			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
+			return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+		}
+	}
+
+	public static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+		if (stateHandle == null) {
+			dos.writeByte(NULL_HANDLE);
+
+		} else if (stateHandle instanceof FileStateHandle) {
+			dos.writeByte(FILE_STREAM_STATE_HANDLE);
+			FileStateHandle fileStateHandle = (FileStateHandle) stateHandle;
+			dos.writeUTF(fileStateHandle.getFilePath().toString());
+
+		} else if (stateHandle instanceof ByteStreamStateHandle) {
+			dos.writeByte(BYTE_STREAM_STATE_HANDLE);
+			ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle) stateHandle;
+			byte[] internalData = byteStreamStateHandle.getData();
+			dos.writeInt(internalData.length);
+			dos.write(byteStreamStateHandle.getData());
+
+		} else {
+			throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
+		}
+
+		dos.flush();
+	}
+
+	public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+		int type = dis.read();
+		if (NULL_HANDLE == type) {
+			return null;
+		} else if (FILE_STREAM_STATE_HANDLE == type) {
+			String pathString = dis.readUTF();
+			return new FileStateHandle(new Path(pathString));
+		} else if (BYTE_STREAM_STATE_HANDLE == type) {
+			int numBytes = dis.readInt();
+			byte[] data = new byte[numBytes];
+			dis.read(data);
+			return new ByteStreamStateHandle(data);
+		} else {
+			throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index 9d47457..2217fd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -22,8 +22,8 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import scala.Option;
 
@@ -140,7 +140,12 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
 		}
 
 		synchronized (statsLock) {
-			long overallStateSize = checkpoint.getStateSize();
+			long overallStateSize;
+			try {
+				overallStateSize = checkpoint.getStateSize();
+			} catch (Exception ex) {
+				throw new RuntimeException(ex);
+			}
 
 			// Operator stats
 			Map<JobVertexID, long[][]> statsForSubTasks = new HashMap<>();
@@ -421,7 +426,11 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
 	private class CheckpointSizeGauge implements Gauge<Long> {
 		@Override
 		public Long getValue() {
-			return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getStateSize();
+			try {
+				return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getStateSize();
+			} catch (Exception ex) {
+				throw new RuntimeException(ex);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 60fb45c..8849e93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -25,7 +25,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.Serializable;
@@ -33,6 +35,7 @@ import java.net.URL;
 import java.util.Collection;
 import java.util.List;
 
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -88,7 +91,11 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	/** The list of classpaths required to run this task. */
 	private final List<URL> requiredClasspaths;
 
-	private final SerializedValue<StateHandle<?>> operatorState;
+	/** Handle to the non-partitioned state of the operator chain */
+	private final ChainedStateHandle<StreamStateHandle> operatorState;
+
+	/** Handle to the key-grouped state of the head operator in the chain */
+	private final List<KeyGroupsStateHandle> keyGroupState;
 
 	/** The execution configuration (see {@link ExecutionConfig}) related to the specific job. */
 	private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
@@ -114,7 +121,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			List<BlobKey> requiredJarFiles,
 			List<URL> requiredClasspaths,
 			int targetSlotNumber,
-			SerializedValue<StateHandle<?>> operatorState) {
+			ChainedStateHandle<StreamStateHandle> operatorState,
+			List<KeyGroupsStateHandle> keyGroupState) {
 
 		checkArgument(indexInSubtaskGroup >= 0);
 		checkArgument(numberOfSubtasks > indexInSubtaskGroup);
@@ -139,6 +147,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		this.requiredClasspaths = checkNotNull(requiredClasspaths);
 		this.targetSlotNumber = targetSlotNumber;
 		this.operatorState = operatorState;
+		this.keyGroupState = keyGroupState;
 	}
 
 	public TaskDeploymentDescriptor(
@@ -178,6 +187,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			requiredJarFiles,
 			requiredClasspaths,
 			targetSlotNumber,
+			null,
 			null);
 	}
 
@@ -316,7 +326,11 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		return strBuilder.toString();
 	}
 
-	public SerializedValue<StateHandle<?>> getOperatorState() {
+	public ChainedStateHandle<StreamStateHandle> getOperatorState() {
 		return operatorState;
 	}
+
+	public List<KeyGroupsStateHandle> getKeyGroupState() {
+		return keyGroupState;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 2f158fd..1eee9d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -35,9 +35,12 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -166,14 +169,18 @@ public interface Environment {
 	void acknowledgeCheckpoint(long checkpointId);
 
 	/**
-	 * Confirms that the invokable has successfully completed all steps it needed to
-	 * to for the checkpoint with the give checkpoint-ID. This method does include
+	 * Confirms that the invokable has successfully completed all required steps for
+	 * the checkpoint with the give checkpoint-ID. This method does include
 	 * the given state in the checkpoint.
 	 *
 	 * @param checkpointId The ID of the checkpoint.
-	 * @param state A handle to the state to be included in the checkpoint.   
+	 * @param chainedStateHandle Handle for the chained operator state
+	 * @param keyGroupStateHandles  Handles for key group state
 	 */
-	void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state);
+	void acknowledgeCheckpoint(
+			long checkpointId,
+			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+			List<KeyGroupsStateHandle> keyGroupStateHandles);
 
 	/**
 	 * Marks task execution failed for an external reason (a reason other than the task code itself

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 5bab780..1981f5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,9 +46,10 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -134,8 +135,10 @@ public class Execution {
 
 	private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
 
-	/** The state with which the execution attempt should start */
-	private SerializedValue<StateHandle<?>> operatorState;
+	private ChainedStateHandle<StreamStateHandle> chainedStateHandle;
+
+	private List<KeyGroupsStateHandle> keyGroupsStateHandles;
+	
 
 	/** The execution context which is used to execute futures. */
 	private ExecutionContext executionContext;
@@ -215,6 +218,14 @@ public class Execution {
 		return this.stateTimestamps[state.ordinal()];
 	}
 
+	public ChainedStateHandle<StreamStateHandle> getChainedStateHandle() {
+		return chainedStateHandle;
+	}
+
+	public List<KeyGroupsStateHandle> getKeyGroupsStateHandles() {
+		return keyGroupsStateHandles;
+	}
+
 	public boolean isFinished() {
 		return state.isTerminal();
 	}
@@ -234,19 +245,22 @@ public class Execution {
 		partialInputChannelDeploymentDescriptors = null;
 	}
 
+	/**
+	 * Sets the initial state for the execution. The serialized state is then shipped via the
+	 * {@link TaskDeploymentDescriptor} to the TaskManagers.
+	 *
+	 * @param chainedStateHandle Chained operator state
+	 * @param keyGroupsStateHandles Key-group state (= partitioned state)
+	 */
 	public void setInitialState(
-			SerializedValue<StateHandle<?>> initialState,
-			Map<Integer, SerializedValue<StateHandle<?>>> initialKvState) {
-
-		if (initialKvState != null && initialKvState.size() > 0) {
-			throw new UnsupportedOperationException("Error: inconsistent handling of key/value state snapshots");
-		}
+		ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+			List<KeyGroupsStateHandle> keyGroupsStateHandles) {
 
 		if (state != ExecutionState.CREATED) {
 			throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
 		}
-
-		this.operatorState = initialState;
+		this.chainedStateHandle = chainedStateHandle;
+		this.keyGroupsStateHandles = keyGroupsStateHandles;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -373,7 +387,8 @@ public class Execution {
 			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
 				attemptId,
 				slot,
-				operatorState,
+					chainedStateHandle,
+					keyGroupsStateHandles,
 				attemptNumber);
 
 			// register this execution at the execution graph, to receive call backs

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index b15f851..f3a8b6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -42,9 +42,11 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
 
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.slf4j.Logger;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -183,14 +185,14 @@ public class ExecutionVertex {
 		return this.jobVertex.getParallelism();
 	}
 
-	public int getParallelSubtaskIndex() {
-		return this.subTaskIndex;
-	}
-
 	public int getMaxParallelism() {
 		return this.jobVertex.getMaxParallelism();
 	}
 
+	public int getParallelSubtaskIndex() {
+		return this.subTaskIndex;
+	}
+
 	public int getNumberOfInputs() {
 		return this.inputEdges.length;
 	}
@@ -229,7 +231,7 @@ public class ExecutionVertex {
 	public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
 		return currentExecution.getAssignedResourceLocation();
 	}
-	
+
 	public Execution getPriorExecutionAttempt(int attemptNumber) {
 		if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
 			return priorExecutions.get(attemptNumber);
@@ -238,7 +240,7 @@ public class ExecutionVertex {
 			throw new IllegalArgumentException("attempt does not exist");
 		}
 	}
-	
+
 	public ExecutionGraph getExecutionGraph() {
 		return this.jobVertex.getGraph();
 	}
@@ -275,7 +277,7 @@ public class ExecutionVertex {
 		this.inputEdges[inputNumber] = edges;
 
 		// add the consumers to the source
-		// for now (until the receiver initiated handshake is in place), we need to register the 
+		// for now (until the receiver initiated handshake is in place), we need to register the
 		// edges as the execution graph
 		for (ExecutionEdge ee : edges) {
 			ee.getSource().addConsumer(ee, consumerNumber);
@@ -486,11 +488,11 @@ public class ExecutionVertex {
 			ExecutionAttemptID attemptID,
 			ActorGateway sender) {
 		Execution exec = getCurrentExecutionAttempt();
-		
+
 		// check that this is for the correct execution attempt
 		if (exec != null && exec.getAttemptId().equals(attemptID)) {
 			SimpleSlot slot = exec.getAssignedResource();
-			
+
 			// send only if we actually have a target
 			if (slot != null) {
 				ActorGateway gateway = slot.getInstance().getActorGateway();
@@ -517,7 +519,7 @@ public class ExecutionVertex {
 			return false;
 		}
 	}
-	
+
 	/**
 	 * Schedules or updates the consumer tasks of the result partition with the given ID.
 	 */
@@ -633,13 +635,14 @@ public class ExecutionVertex {
 
 	/**
 	 * Creates a task deployment descriptor to deploy a subtask to the given target slot.
-	 * 
+	 *
 	 * TODO: This should actually be in the EXECUTION
 	 */
 	TaskDeploymentDescriptor createDeploymentDescriptor(
 			ExecutionAttemptID executionId,
 			SimpleSlot targetSlot,
-			SerializedValue<StateHandle<?>> operatorState,
+			ChainedStateHandle<StreamStateHandle> operatorState,
+			List<KeyGroupsStateHandle> keyGroupStates,
 			int attemptNumber) {
 
 		// Produced intermediate results
@@ -690,7 +693,8 @@ public class ExecutionVertex {
 			jarFiles,
 			classpaths,
 			targetSlot.getRoot().getSlotNumber(),
-			operatorState);
+			operatorState,
+			keyGroupStates);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 4786388..a623295 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -249,7 +249,7 @@ public class JobVertex implements java.io.Serializable {
 	/**
 	 * Sets the maximum parallelism for the task.
 	 *
-	 * @param maxParallelism The maximum parallelism to be set.
+	 * @param maxParallelism The maximum parallelism to be set. must be between 1 and Short.MAX_VALUE.
 	 */
 	public void setMaxParallelism(int maxParallelism) {
 		org.apache.flink.util.Preconditions.checkArgument(

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index f8bba1a..cab7ed6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -18,21 +18,27 @@
 
 package org.apache.flink.runtime.jobgraph.tasks;
 
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+
+import java.util.List;
 
 /**
  * This interface must be implemented by any invokable that has recoverable state and participates
  * in checkpointing.
  */
-public interface StatefulTask<T extends StateHandle<?>> {
+public interface StatefulTask {
 
 	/**
 	 * Sets the initial state of the operator, upon recovery. The initial state is typically
 	 * a snapshot of the state from a previous execution.
 	 * 
-	 * @param stateHandle The handle to the state.
+	 * @param chainedState Handle for the chained operator states.
+	 * @param keyGroupsState Handle for key group states.
 	 */
-	void setInitialState(T stateHandle) throws Exception;
+	void setInitialState(ChainedStateHandle<StreamStateHandle> chainedState, List<KeyGroupsStateHandle> keyGroupsState) throws Exception;
 
 	/**
 	 * This method is either called directly and asynchronously by the checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 7f7c5fe..ec05f1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -25,8 +25,8 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -98,7 +98,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	public ZooKeeperSubmittedJobGraphStore(
 			CuratorFramework client,
 			String currentJobsPath,
-			StateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
+			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
 
 		checkNotNull(currentJobsPath, "Current jobs path");
 		checkNotNull(stateStorage, "State storage");
@@ -153,7 +153,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
-			List<Tuple2<StateHandle<SubmittedJobGraph>, String>> submitted;
+			List<Tuple2<RetrievableStateHandle<SubmittedJobGraph>, String>> submitted;
 
 			while (true) {
 				try {
@@ -168,10 +168,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 			if (submitted.size() != 0) {
 				List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
 
-				for (Tuple2<StateHandle<SubmittedJobGraph>, String> jobStateHandle : submitted) {
-					SubmittedJobGraph jobGraph = jobStateHandle
-							.f0.getState(ClassLoader.getSystemClassLoader());
-
+				for (Tuple2<RetrievableStateHandle<SubmittedJobGraph>, String> jobStateHandle : submitted) {
+					SubmittedJobGraph jobGraph = jobStateHandle.f0.retrieveState();
 					addedJobGraphs.add(jobGraph.getJobId());
 
 					jobGraphs.add(jobGraph);
@@ -196,11 +194,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 			verifyIsRunning();
 
 			try {
-				StateHandle<SubmittedJobGraph> jobStateHandle = jobGraphsInZooKeeper.get(path);
-
-				SubmittedJobGraph jobGraph = jobStateHandle
-						.getState(ClassLoader.getSystemClassLoader());
-
+				SubmittedJobGraph jobGraph = jobGraphsInZooKeeper.get(path).retrieveState();
 				addedJobGraphs.add(jobGraph.getJobId());
 
 				LOG.info("Recovered {}.", jobGraph);

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index a4d438b..0c56603 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -20,51 +20,50 @@ package org.apache.flink.runtime.messages.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.List;
 
 /**
  * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
  * {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the checkpoint of an
  * individual task is completed.
  * 
- * This message may carry the handle to the task's state.
+ * <p>This message may carry the handle to the task's chained operator state and the key group
+ * state.
  */
 public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
 
 	private static final long serialVersionUID = -7606214777192401493L;
 	
-	private final SerializedValue<StateHandle<?>> state;
+	private final ChainedStateHandle<StreamStateHandle> stateHandle;
 
-	/**
-	 * The state size. This is an optimization in order to not deserialize the
-	 * state handle at the checkpoint coordinator when gathering stats about
-	 * the checkpoints.
-	 */
-	private final long stateSize;
+	private final List<KeyGroupsStateHandle> keyGroupsStateHandle;
 
 	public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
-		this(job, taskExecutionId, checkpointId, null, 0);
+		this(job, taskExecutionId, checkpointId, null, null);
 	}
 
 	public AcknowledgeCheckpoint(
-			JobID job,
-			ExecutionAttemptID taskExecutionId,
-			long checkpointId,
-			SerializedValue<StateHandle<?>> state,
-			long stateSize) {
+		JobID job,
+		ExecutionAttemptID taskExecutionId,
+		long checkpointId,
+		ChainedStateHandle<StreamStateHandle> state,
+		List<KeyGroupsStateHandle> keyGroupStateAndSizes) {
 
 		super(job, taskExecutionId, checkpointId);
-		this.state = state;
-		this.stateSize = stateSize;
+		this.stateHandle = state;
+		this.keyGroupsStateHandle = keyGroupStateAndSizes;
 	}
 
-	public SerializedValue<StateHandle<?>> getState() {
-		return state;
+	public ChainedStateHandle<StreamStateHandle> getStateHandle() {
+		return stateHandle;
 	}
 
-	public long getStateSize() {
-		return stateSize;
+	public List<KeyGroupsStateHandle> getKeyGroupsStateHandle() {
+		return keyGroupsStateHandle;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -76,8 +75,10 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 		}
 		else if (o instanceof AcknowledgeCheckpoint) {
 			AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o;
-			return super.equals(o) && (this.state == null ? that.state == null :
-					(that.state != null && this.state.equals(that.state)));
+			return super.equals(o) &&
+					(this.stateHandle == null ? that.stateHandle == null : (that.stateHandle != null && this.stateHandle.equals(that.stateHandle))) &&
+					(this.keyGroupsStateHandle == null ? that.keyGroupsStateHandle == null : (that.keyGroupsStateHandle != null && this.keyGroupsStateHandle.equals(that.keyGroupsStateHandle)));
+
 		}
 		else {
 			return false;
@@ -86,7 +87,7 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 
 	@Override
 	public String toString() {
-		return String.format("Confirm Task Checkpoint %d for (%s/%s) - state=%s",
-				getCheckpointId(), getJob(), getTaskExecutionId(), state);
+		return String.format("Confirm Task Checkpoint %d for (%s/%s) - state=%s keyGroupState=%s",
+				getCheckpointId(), getJob(), getTaskExecutionId(), stateHandle, keyGroupsStateHandle);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
index 609158d..5966c95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
@@ -20,27 +20,26 @@ package org.apache.flink.runtime.state;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 /**
  * A simple base for closable handles.
- * 
+ *
  * Offers to register a stream (or other closable object) that close calls are delegated to if
  * the handle is closed or was already closed.
  */
-public abstract class AbstractCloseableHandle implements Closeable, Serializable {
+public abstract class AbstractCloseableHandle implements Closeable, StateObject {
 
 	/** Serial Version UID must be constant to maintain format compatibility */
 	private static final long serialVersionUID = 1L;
 
 	/** To atomically update the "closable" field without needing to add a member class like "AtomicBoolean */
-	private static final AtomicIntegerFieldUpdater<AbstractCloseableHandle> CLOSER = 
+	private static final AtomicIntegerFieldUpdater<AbstractCloseableHandle> CLOSER =
 			AtomicIntegerFieldUpdater.newUpdater(AbstractCloseableHandle.class, "isClosed");
 
 	// ------------------------------------------------------------------------
 
-	/** The closeable to close if this handle is closed late */ 
+	/** The closeable to close if this handle is closed late */
 	private transient volatile Closeable toClose;
 
 	/** Flag to remember if this handle was already closed */
@@ -53,7 +52,7 @@ public abstract class AbstractCloseableHandle implements Closeable, Serializable
 		if (toClose == null) {
 			return;
 		}
-		
+
 		// NOTE: The order of operations matters here:
 		// (1) first setting the closeable
 		// (2) checking the flag.
@@ -73,16 +72,16 @@ public abstract class AbstractCloseableHandle implements Closeable, Serializable
 
 	/**
 	 * Closes the handle.
-	 * 
+	 *
 	 * <p>If a "Closeable" has been registered via {@link #registerCloseable(Closeable)},
 	 * then this will be closes.
-	 * 
+	 *
 	 * <p>If any "Closeable" will be registered via {@link #registerCloseable(Closeable)} in the future,
 	 * it will immediately be closed and that method will throw an exception.
-	 * 
+	 *
 	 * @throws IOException Exceptions occurring while closing an already registered {@code Closeable}
 	 *                     are forwarded.
-	 * 
+	 *
 	 * @see #registerCloseable(Closeable)
 	 */
 	@Override
@@ -106,7 +105,7 @@ public abstract class AbstractCloseableHandle implements Closeable, Serializable
 
 	/**
 	 * Checks whether this handle has been closed.
-	 * 
+	 *
 	 * @return True is the handle is closed, false otherwise.
 	 */
 	public boolean isClosed() {
@@ -116,7 +115,7 @@ public abstract class AbstractCloseableHandle implements Closeable, Serializable
 	/**
 	 * This method checks whether the handle is closed and throws an exception if it is closed.
 	 * If the handle is not closed, this method does nothing.
-	 * 
+	 *
 	 * @throws IOException Thrown, if the handle has been closed.
 	 */
 	public void ensureNotClosed() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 6fc9475..b2cde22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -33,17 +33,12 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -122,7 +117,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 	 */
 	public abstract void close() throws Exception;
 
-	public void dispose() {
+	public void discardState() throws Exception {
 		if (kvStateRegistry != null) {
 			kvStateRegistry.unregisterAll();
 		}
@@ -418,37 +413,6 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 	public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
 			long checkpointID, long timestamp) throws Exception;
 
-	/**
-	 * Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
-	 * When the stream is closes, it returns a state handle that can retrieve the state back.
-	 *
-	 * @param checkpointID The ID of the checkpoint.
-	 * @param timestamp The timestamp of the checkpoint.
-	 * @return An DataOutputView stream that writes state for the given checkpoint.
-	 *
-	 * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
-	 */
-	public CheckpointStateOutputView createCheckpointStateOutputView(
-			long checkpointID, long timestamp) throws Exception {
-		return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));
-	}
-
-	/**
-	 * Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
-	 *
-	 * @param state The state to be checkpointed.
-	 * @param checkpointID The ID of the checkpoint.
-	 * @param timestamp The timestamp of the checkpoint.
-	 * @param <S> The type of the state.
-	 *
-	 * @return A state handle that can retrieve the checkpoined state.
-	 *
-	 * @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
-	 */
-	public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
-			S state, long checkpointID, long timestamp) throws Exception;
-
-
 	// ------------------------------------------------------------------------
 	//  Checkpoint state output stream
 	// ------------------------------------------------------------------------
@@ -456,7 +420,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 	/**
 	 * A dedicated output stream that produces a {@link StreamStateHandle} when closed.
 	 */
-	public static abstract class CheckpointStateOutputStream extends OutputStream {
+	public static abstract class CheckpointStateOutputStream extends FSDataOutputStream {
 
 		/**
 		 * Closes the stream and gets a state handle that can create an input stream
@@ -467,67 +431,4 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 		 */
 		public abstract StreamStateHandle closeAndGetHandle() throws IOException;
 	}
-
-	/**
-	 * A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
-	 */
-	public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {
-
-		private final CheckpointStateOutputStream out;
-
-		public CheckpointStateOutputView(CheckpointStateOutputStream out) {
-			super(out);
-			this.out = out;
-		}
-
-		/**
-		 * Closes the stream and gets a state handle that can create a DataInputView.
-		 * producing the data written to this stream.
-		 *
-		 * @return A state handle that can create an input stream producing the data written to this stream.
-		 * @throws IOException Thrown, if the stream cannot be closed.
-		 */
-		public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
-			return new DataInputViewHandle(out.closeAndGetHandle());
-		}
-
-		@Override
-		public void close() throws IOException {
-			out.close();
-		}
-	}
-
-	/**
-	 * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
-	 */
-	private static final class DataInputViewHandle implements StateHandle<DataInputView> {
-
-		private static final long serialVersionUID = 2891559813513532079L;
-
-		private final StreamStateHandle stream;
-
-		private DataInputViewHandle(StreamStateHandle stream) {
-			this.stream = stream;
-		}
-
-		@Override
-		public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
-			return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
-		}
-
-		@Override
-		public void discardState() throws Exception {
-			stream.discardState();
-		}
-
-		@Override
-		public long getStateSize() throws Exception {
-			return stream.getStateSize();
-		}
-
-		@Override
-		public void close() throws IOException {
-			stream.close();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
deleted file mode 100644
index fee1efe..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-/**
- * {@link StateHandle} that can asynchronously materialize the state that it represents. Instead
- * of representing a materialized handle to state this would normally hold the (immutable) state
- * internally and can materialize it if requested.
- */
-public abstract class AsynchronousStateHandle<T> implements StateHandle<T> {
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Materializes the state held by this {@code AsynchronousStateHandle}.
-	 */
-	public abstract StateHandle<T> materialize() throws Exception;
-
-	@Override
-	public final T getState(ClassLoader userCodeClassLoader) throws Exception {
-		throw new UnsupportedOperationException("This must not be called. This is likely an internal bug.");
-	}
-
-	@Override
-	public final void discardState() throws Exception {
-		throw new UnsupportedOperationException("This must not be called. This is likely an internal bug.");
-	}
-}