You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:35 UTC

[17/27] flink git commit: [FLINK-4381] Refactor State to Prepare For Key-Group State Backends

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index dea3452..d62b13e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -40,9 +40,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * State handles backed by ZooKeeper.
  *
- * <p>Added state is persisted via {@link StateHandle}s, which in turn are written to
- * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper
- * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs.
+ * <p>Added state is persisted via {@link RetrievableStateHandle RetrievableStateHandles},
+ * which in turn are written to ZooKeeper. This level of indirection is necessary to keep the
+ * amount of data in ZooKeeper small. ZooKeeper is build for data in the KB range whereas
+ * state can grow to multiple MBs.
  *
  * <p>State modifications require some care, because it is possible that certain failures bring
  * the state handle backend and ZooKeeper out of sync.
@@ -72,7 +73,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	/** Curator ZooKeeper client */
 	private final CuratorFramework client;
 
-	private final StateStorageHelper<T> storage;
+	private final RetrievableStateStorageHelper<T> storage;
 
 	/**
 	 * Creates a {@link ZooKeeperStateHandleStore}.
@@ -84,7 +85,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 */
 	public ZooKeeperStateHandleStore(
 		CuratorFramework client,
-		StateStorageHelper storage) throws IOException {
+		RetrievableStateStorageHelper<T> storage) throws IOException {
 
 		this.client = checkNotNull(client, "Curator client");
 		this.storage = checkNotNull(storage, "State storage");
@@ -94,9 +95,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 * Creates a state handle and stores it in ZooKeeper with create mode {@link
 	 * CreateMode#PERSISTENT}.
 	 *
-	 * @see #add(String, Serializable, CreateMode)
+	 * @see #add(String, T, CreateMode)
 	 */
-	public StateHandle<T> add(String pathInZooKeeper, T state) throws Exception {
+	public RetrievableStateHandle<T> add(String pathInZooKeeper, T state) throws Exception {
 		return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
 	}
 
@@ -111,39 +112,39 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 *                        start with a '/')
 	 * @param state           State to be added
 	 * @param createMode      The create mode for the new path in ZooKeeper
-	 * @return Created {@link StateHandle}
+	 *
+	 * @return The Created {@link RetrievableStateHandle}.
 	 * @throws Exception If a ZooKeeper or state handle operation fails
 	 */
-	public StateHandle<T> add(
+	public RetrievableStateHandle<T> add(
 			String pathInZooKeeper,
 			T state,
 			CreateMode createMode) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 		checkNotNull(state, "State");
 
-		StateHandle<T> stateHandle = storage.store(state);
+		RetrievableStateHandle<T> storeHandle = storage.store(state);
 
 		boolean success = false;
 
 		try {
 			// Serialize the state handle. This writes the state to the backend.
-			byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle);
+			byte[] serializedStoreHandle = InstantiationUtil.serializeObject(storeHandle);
 
 			// Write state handle (not the actual state) to ZooKeeper. This is expected to be
 			// smaller than the state itself. This level of indirection makes sure that data in
 			// ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
 			// the state can be larger.
-			client.create().withMode(createMode).forPath(pathInZooKeeper, serializedStateHandle);
+			client.create().withMode(createMode).forPath(pathInZooKeeper, serializedStoreHandle);
 
 			success = true;
-
-			return stateHandle;
+			return storeHandle;
 		}
 		finally {
 			if (!success) {
 				// Cleanup the state handle if it was not written to ZooKeeper.
-				if (stateHandle != null) {
-					stateHandle.discardState();
+				if (storeHandle != null) {
+					storeHandle.discardState();
 				}
 			}
 		}
@@ -161,31 +162,29 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 		checkNotNull(state, "State");
 
-		StateHandle<T> oldStateHandle = get(pathInZooKeeper);
+		RetrievableStateHandle<T> oldStateHandle = get(pathInZooKeeper);
 
-		StateHandle<T> stateHandle = storage.store(state);
+		RetrievableStateHandle<T> newStateHandle = storage.store(state);
 
 		boolean success = false;
 
 		try {
 			// Serialize the new state handle. This writes the state to the backend.
-			byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle);
+			byte[] serializedStateHandle = InstantiationUtil.serializeObject(newStateHandle);
 
 			// Replace state handle in ZooKeeper.
 			client.setData()
 					.withVersion(expectedVersion)
 					.forPath(pathInZooKeeper, serializedStateHandle);
-
 			success = true;
-		}
-		finally {
-			if (success) {
+		} finally {
+			if(success) {
 				oldStateHandle.discardState();
-			}
-			else {
-				stateHandle.discardState();
+			} else {
+				newStateHandle.discardState();
 			}
 		}
+
 	}
 
 	/**
@@ -216,13 +215,11 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 * @throws Exception If a ZooKeeper or state handle operation fails
 	 */
 	@SuppressWarnings("unchecked")
-	public StateHandle<T> get(String pathInZooKeeper) throws Exception {
+	public RetrievableStateHandle<T> get(String pathInZooKeeper) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
 		byte[] data = client.getData().forPath(pathInZooKeeper);
-
-		return (StateHandle<T>) InstantiationUtil
-				.deserializeObject(data, ClassLoader.getSystemClassLoader());
+		return InstantiationUtil.deserializeObject(data);
 	}
 
 	/**
@@ -234,8 +231,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 * @throws Exception If a ZooKeeper or state handle operation fails
 	 */
 	@SuppressWarnings("unchecked")
-	public List<Tuple2<StateHandle<T>, String>> getAll() throws Exception {
-		final List<Tuple2<StateHandle<T>, String>> stateHandles = new ArrayList<>();
+	public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() throws Exception {
+		final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>();
 
 		boolean success = false;
 
@@ -254,7 +251,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 					path = "/" + path;
 
 					try {
-						final StateHandle<T> stateHandle = get(path);
+						final RetrievableStateHandle<T> stateHandle = get(path);
 						stateHandles.add(new Tuple2<>(stateHandle, path));
 					} catch (KeeperException.NoNodeException ignored) {
 						// Concurrent deletion, retry
@@ -272,6 +269,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 		return stateHandles;
 	}
 
+
 	/**
 	 * Gets all available state handles from ZooKeeper sorted by name (ascending).
 	 *
@@ -281,8 +279,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 * @throws Exception If a ZooKeeper or state handle operation fails
 	 */
 	@SuppressWarnings("unchecked")
-	public List<Tuple2<StateHandle<T>, String>> getAllSortedByName() throws Exception {
-		final List<Tuple2<StateHandle<T>, String>> stateHandles = new ArrayList<>();
+	public List<Tuple2<RetrievableStateHandle<T>, String>> getAllSortedByName() throws Exception {
+		final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>();
 
 		boolean success = false;
 
@@ -303,7 +301,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 					path = "/" + path;
 
 					try {
-						final StateHandle<T> stateHandle = get(path);
+						final RetrievableStateHandle<T> stateHandle = get(path);
 						stateHandles.add(new Tuple2<>(stateHandle, path));
 					} catch (KeeperException.NoNodeException ignored) {
 						// Concurrent deletion, retry
@@ -364,7 +362,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	public void removeAndDiscardState(String pathInZooKeeper) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
-		StateHandle<T> stateHandle = get(pathInZooKeeper);
+		RetrievableStateHandle<T> stateHandle = get(pathInZooKeeper);
 
 		// Delete the state handle from ZooKeeper first
 		client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
@@ -381,7 +379,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 * @throws Exception If a ZooKeeper or state handle operation fails
 	 */
 	public void removeAndDiscardAllState() throws Exception {
-		final List<Tuple2<StateHandle<T>, String>> allStateHandles = getAll();
+		final List<Tuple2<RetrievableStateHandle<T>, String>> allStateHandles = getAll();
 
 		ZKPaths.deleteChildren(
 				client.getZookeeperClient().getZooKeeper(),
@@ -389,7 +387,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 				false);
 
 		// Discard the state handles only after they have been successfully deleted from ZooKeeper.
-		for (Tuple2<StateHandle<T>, String> stateHandleAndPath : allStateHandles) {
+		for (Tuple2<RetrievableStateHandle<T>, String> stateHandleAndPath : allStateHandles) {
 			stateHandleAndPath.f0.discardState();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
index 6692ef0..a534b40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
@@ -21,22 +21,22 @@ package org.apache.flink.runtime.zookeeper.filesystem;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 /**
- * {@link StateStorageHelper} implementation which stores the state in the given filesystem path.
+ * {@link RetrievableStateStorageHelper} implementation which stores the state in the given filesystem path.
  *
- * @param <T>
+ * @param <T> The type of the data that can be stored by this storage helper.
  */
-public class FileSystemStateStorageHelper<T extends Serializable> implements StateStorageHelper<T> {
+public class FileSystemStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
 
 	private final Path rootPath;
 
@@ -56,7 +56,7 @@ public class FileSystemStateStorageHelper<T extends Serializable> implements Sta
 	}
 
 	@Override
-	public StateHandle<T> store(T state) throws Exception {
+	public RetrievableStateHandle<T> store(T state) throws Exception {
 		Exception latestException = null;
 
 		for (int attempt = 0; attempt < 10; attempt++) {
@@ -73,8 +73,7 @@ public class FileSystemStateStorageHelper<T extends Serializable> implements Sta
 			try(ObjectOutputStream os = new ObjectOutputStream(outStream)) {
 				os.writeObject(state);
 			}
-
-			return new FileSerializableStateHandle<>(filePath);
+			return new RetrievableStreamStateHandle<T>(filePath);
 		}
 
 		throw new Exception("Could not open output stream for state backend", latestException);

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 356f1a9..407fa01 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -738,29 +738,18 @@ class JobManager(
           sender() ! TriggerSavepointFailure(jobId, new IllegalArgumentException("Unknown job."))
       }
 
-    case DisposeSavepoint(savepointPath, blobKeys) =>
+    case DisposeSavepoint(savepointPath) =>
       val senderRef = sender()
       future {
         try {
           log.info(s"Disposing savepoint at '$savepointPath'.")
 
-          if (blobKeys.isDefined) {
-            // We don't need a real ID here for the library cache manager
-            val jid = new JobID()
+          val savepoint = savepointStore.loadSavepoint(savepointPath)
 
-            try {
-              libraryCacheManager.registerJob(jid, blobKeys.get, java.util.Collections.emptyList())
-              val classLoader = libraryCacheManager.getClassLoader(jid)
+          log.debug(s"$savepoint")
 
-              // Discard with user code loader
-              savepointStore.disposeSavepoint(savepointPath, classLoader)
-            } finally {
-              libraryCacheManager.unregisterJob(jid)
-            }
-          } else {
-            // Discard with system class loader
-            savepointStore.disposeSavepoint(savepointPath, getClass.getClassLoader)
-          }
+          // Dispose the savepoint
+          savepointStore.disposeSavepoint(savepointPath)
 
           senderRef ! DisposeSavepointSuccess
         } catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 40c4dcf..5e2b547 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -490,13 +490,9 @@ object JobManagerMessages {
     * Disposes a savepoint.
     *
     * @param savepointPath The path of the savepoint to dispose.
-    * @param blobKeys BLOB keys if a user program JAR was uploaded for disposal.
-    *                 This is required when we dispose state which contains
-    *                 custom state instances (e.g. reducing state, rocksDB state).
     */
   case class DisposeSavepoint(
-      savepointPath: String,
-      blobKeys: Option[java.util.List[BlobKey]] = None)
+      savepointPath: String)
     extends RequiresLeaderSessionID
 
   /** Response after a successful savepoint dispose. */

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 1816fc9..5416292 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -28,14 +28,19 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
 import org.apache.flink.runtime.util.SerializableObject;
-import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -55,8 +60,11 @@ public class CheckpointStateRestoreTest {
 	@Test
 	public void testSetState() {
 		try {
-			final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>(
-					new LocalStateHandle<SerializableObject>(new SerializableObject()));
+
+			final ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new SerializableObject());
+			KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0);
+			List<SerializableObject> testStates = Collections.singletonList(new SerializableObject());
+			final List<KeyGroupsStateHandle> serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates);
 
 			final JobID jid = new JobID();
 			final JobVertexID statefulId = new JobVertexID();
@@ -106,9 +114,9 @@ public class CheckpointStateRestoreTest {
 			PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next();
 			final long checkpointId = pending.getCheckpointId();
 
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, 0));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, serializedState, 0));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, 0));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
 
@@ -119,11 +127,11 @@ public class CheckpointStateRestoreTest {
 			coord.restoreLatestCheckpointedState(map, true, false);
 
 			// verify that each stateful vertex got the state
-			verify(statefulExec1, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
-			verify(statefulExec2, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
-			verify(statefulExec3, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
-			verify(statelessExec1, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
-			verify(statelessExec2, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
+			verify(statefulExec1, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<List<KeyGroupsStateHandle>>any());
+			verify(statefulExec2, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<List<KeyGroupsStateHandle>>any());
+			verify(statefulExec3, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<List<KeyGroupsStateHandle>>any());
+			verify(statelessExec1, times(0)).setInitialState(Mockito.<ChainedStateHandle<StreamStateHandle>>any(), Mockito.<List<KeyGroupsStateHandle>>any());
+			verify(statelessExec2, times(0)).setInitialState(Mockito.<ChainedStateHandle<StreamStateHandle>>any(), Mockito.<List<KeyGroupsStateHandle>>any());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -134,8 +142,10 @@ public class CheckpointStateRestoreTest {
 	@Test
 	public void testStateOnlyPartiallyAvailable() {
 		try {
-			final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>(
-					new LocalStateHandle<SerializableObject>(new SerializableObject()));
+			final ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new SerializableObject());
+			KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0);
+			List<SerializableObject> testStates = Collections.singletonList(new SerializableObject());
+			final List<KeyGroupsStateHandle> serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates);
 
 			final JobID jid = new JobID();
 			final JobVertexID statefulId = new JobVertexID();
@@ -186,9 +196,9 @@ public class CheckpointStateRestoreTest {
 			final long checkpointId = pending.getCheckpointId();
 
 			// the difference to the test "testSetState" is that one stateful subtask does not report state
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, 0));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, 0));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 634e177..6182ffd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -21,8 +21,9 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.CheckpointMessagesTest;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -107,8 +108,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			// The ZooKeeper implementation discards asynchronously
 			expected[i - 1].awaitDiscard();
 			assertTrue(expected[i - 1].isDiscarded());
-			assertEquals(userClassLoader, expected[i - 1].getDiscardClassLoader());
-
 			assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
 		}
 	}
@@ -183,7 +182,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			// The ZooKeeper implementation discards asynchronously
 			checkpoint.awaitDiscard();
 			assertTrue(checkpoint.isDiscarded());
-			assertEquals(userClassLoader, checkpoint.getDiscardClassLoader());
 		}
 	}
 
@@ -199,14 +197,14 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		JobVertexID jvid = new JobVertexID();
 
 		Map<JobVertexID, TaskState> taskGroupStates = new HashMap<>();
-		TaskState taskState = new TaskState(jvid, numberOfStates);
+		TaskState taskState = new TaskState(jvid, numberOfStates, numberOfStates);
 		taskGroupStates.put(jvid, taskState);
 
 		for (int i = 0; i < numberOfStates; i++) {
-			SerializedValue<StateHandle<?>> stateHandle = new SerializedValue<StateHandle<?>>(
+			ChainedStateHandle<StreamStateHandle> stateHandle = CheckpointCoordinatorTest.generateChainedStateHandle(
 					new CheckpointMessagesTest.MyHandle());
 
-			taskState.putState(i, new SubtaskState(stateHandle, 0, 0));
+			taskState.putState(i, new SubtaskState(stateHandle, 0));
 		}
 
 		return new TestCompletedCheckpoint(new JobID(), id, 0, taskGroupStates);
@@ -230,8 +228,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		// Latch for test variants which discard asynchronously
 		private transient final CountDownLatch discardLatch = new CountDownLatch(1);
 
-		private transient ClassLoader discardClassLoader;
-
 		public TestCompletedCheckpoint(
 			JobID jobId,
 			long checkpointId,
@@ -242,11 +238,10 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		}
 
 		@Override
-		public void discard(ClassLoader userClassLoader) throws Exception {
-			super.discard(userClassLoader);
+		public void discardState() throws Exception {
+			super.discardState();
 
 			if (!isDiscarded) {
-				this.discardClassLoader = userClassLoader;
 				this.isDiscarded = true;
 
 				if (discardLatch != null) {
@@ -265,10 +260,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			}
 		}
 
-		public ClassLoader getDiscardClassLoader() {
-			return discardClassLoader;
-		}
-
 		@Override
 		public boolean equals(Object o) {
 			if (this == o) return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 90a6836..9b04244 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -45,14 +45,14 @@ public class CompletedCheckpointTest {
 
 		// Verify discard call is forwarded to state
 		CompletedCheckpoint checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, taskStates, true);
-		checkpoint.discard(ClassLoader.getSystemClassLoader());
-		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+		checkpoint.discardState();
+		verify(state, times(1)).discardState();
 
 		Mockito.reset(state);
 
 		// Verify discard call is not forwarded to state
 		checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, taskStates, false);
-		checkpoint.discard(ClassLoader.getSystemClassLoader());
-		verify(state, times(0)).discard(Matchers.any(ClassLoader.class));
+		checkpoint.discardState();
+		verify(state, times(0)).discardState();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index d235e61..fd4e02d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -66,7 +66,7 @@ public class PendingCheckpointTest {
 		setTaskState(pending, state);
 
 		pending.abortDeclined();
-		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+		verify(state, times(1)).discardState();
 
 		// Abort error
 		Mockito.reset(state);
@@ -75,7 +75,7 @@ public class PendingCheckpointTest {
 		setTaskState(pending, state);
 
 		pending.abortError(new Exception("Expected Test Exception"));
-		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+		verify(state, times(1)).discardState();
 
 		// Abort expired
 		Mockito.reset(state);
@@ -84,7 +84,7 @@ public class PendingCheckpointTest {
 		setTaskState(pending, state);
 
 		pending.abortExpired();
-		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+		verify(state, times(1)).discardState();
 
 		// Abort subsumed
 		Mockito.reset(state);
@@ -93,7 +93,7 @@ public class PendingCheckpointTest {
 		setTaskState(pending, state);
 
 		pending.abortSubsumed();
-		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+		verify(state, times(1)).discardState();
 	}
 
 	/**
@@ -106,21 +106,20 @@ public class PendingCheckpointTest {
 		PendingCheckpoint pending = createPendingCheckpoint();
 		PendingCheckpointTest.setTaskState(pending, state);
 
-		pending.acknowledgeTask(ATTEMPT_ID, null, 0, null);
+		pending.acknowledgeTask(ATTEMPT_ID, null, null);
 
 		CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
 
 		// Does discard state
-		checkpoint.discard(ClassLoader.getSystemClassLoader());
-		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+		checkpoint.discardState();
+		verify(state, times(1)).discardState();
 	}
 
 	// ------------------------------------------------------------------------
 
 	private static PendingCheckpoint createPendingCheckpoint() {
-		ClassLoader classLoader = ClassLoader.getSystemClassLoader();
 		Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
-		return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, classLoader);
+		return new PendingCheckpoint(new JobID(), 0, 1, ackTasks);
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
index 6ae6e1c..7258545 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
@@ -71,7 +71,7 @@ public class PendingSavepointTest {
 		PendingCheckpointTest.setTaskState(pending, state);
 
 		pending.abortDeclined();
-		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+		verify(state, times(1)).discardState();
 
 		// Abort error
 		Mockito.reset(state);
@@ -81,7 +81,7 @@ public class PendingSavepointTest {
 		Future<String> future = pending.getCompletionFuture();
 
 		pending.abortError(new Exception("Expected Test Exception"));
-		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+		verify(state, times(1)).discardState();
 		assertTrue(future.failed().isCompleted());
 
 		// Abort expired
@@ -92,7 +92,7 @@ public class PendingSavepointTest {
 		future = pending.getCompletionFuture();
 
 		pending.abortExpired();
-		verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+		verify(state, times(1)).discardState();
 		assertTrue(future.failed().isCompleted());
 
 		// Abort subsumed
@@ -117,13 +117,13 @@ public class PendingSavepointTest {
 
 		Future<String> future = pending.getCompletionFuture();
 
-		pending.acknowledgeTask(ATTEMPT_ID, null, 0, null);
+		pending.acknowledgeTask(ATTEMPT_ID, null, null);
 
 		CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
 
 		// Does _NOT_ discard state
-		checkpoint.discard(ClassLoader.getSystemClassLoader());
-		verify(state, times(0)).discard(Matchers.any(ClassLoader.class));
+		checkpoint.discardState();
+		verify(state, times(0)).discardState();
 
 		// Future is completed
 		String path = Await.result(future, Duration.Zero());
@@ -133,9 +133,8 @@ public class PendingSavepointTest {
 	// ------------------------------------------------------------------------
 
 	private static PendingSavepoint createPendingSavepoint() {
-		ClassLoader classLoader = ClassLoader.getSystemClassLoader();
 		Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
-		return new PendingSavepoint(new JobID(), 0, 1, ackTasks, classLoader, new HeapSavepointStore());
+		return new PendingSavepoint(new JobID(), 0, 1, ackTasks, new HeapSavepointStore());
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 380ba2c..f273797 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -19,9 +19,8 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -29,6 +28,8 @@ import org.junit.Test;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -61,10 +62,10 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 			int maxNumberOfCheckpointsToRetain, ClassLoader userLoader) throws Exception {
 
 		return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader,
-			ZooKeeper.createClient(), CheckpointsPath, new StateStorageHelper<CompletedCheckpoint>() {
+			ZooKeeper.createClient(), CheckpointsPath, new RetrievableStateStorageHelper<CompletedCheckpoint>() {
 			@Override
-			public StateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
-				return new LocalStateHandle<>(state);
+			public RetrievableStateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
+				return new HeapRetrievableStateHandle<CompletedCheckpoint>(state);
 			}
 		});
 	}
@@ -160,4 +161,35 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		CompletedCheckpoint recovered = store.getLatestCheckpoint();
 		assertEquals(checkpoint, recovered);
 	}
+
+	static class HeapRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
+
+		private static final long serialVersionUID = -268548467968932L;
+
+		public HeapRetrievableStateHandle(T state) {
+			this.state = state;
+		}
+
+		private T state;
+
+		@Override
+		public T retrieveState() throws Exception {
+			return state;
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			state = null;
+		}
+
+		@Override
+		public long getStateSize() throws Exception {
+			return 0;
+		}
+
+		@Override
+		public void close() throws IOException {
+			
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
index 6b8c651..3e2de80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
@@ -58,7 +58,7 @@ public class FsSavepointStoreTest {
 		assertEquals(0, tmp.getRoot().listFiles().length);
 
 		// Store
-		SavepointV0 stored = new SavepointV0(1929292, SavepointV0Test.createTaskStates(4, 24));
+		SavepointV1 stored = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24));
 		String path = store.storeSavepoint(stored);
 		assertEquals(1, tmp.getRoot().listFiles().length);
 
@@ -67,7 +67,7 @@ public class FsSavepointStoreTest {
 		assertEquals(stored, loaded);
 
 		// Dispose
-		store.disposeSavepoint(path, ClassLoader.getSystemClassLoader());
+		store.disposeSavepoint(path);
 
 		assertEquals(0, tmp.getRoot().listFiles().length);
 	}
@@ -122,7 +122,7 @@ public class FsSavepointStoreTest {
 		assertEquals(1, tmp.getRoot().listFiles().length);
 
 		// Savepoint v0
-		Savepoint savepoint = new SavepointV0(checkpointId, SavepointV0Test.createTaskStates(4, 32));
+		Savepoint savepoint = new SavepointV1(checkpointId, SavepointV1Test.createTaskStates(4, 32));
 		String pathSavepoint = store.storeSavepoint(savepoint);
 		assertEquals(2, tmp.getRoot().listFiles().length);
 
@@ -208,7 +208,7 @@ public class FsSavepointStoreTest {
 		}
 
 		@Override
-		public void dispose(ClassLoader classLoader) {
+		public void dispose() {
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 6a85195..d703bd6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -65,7 +65,7 @@ public class SavepointLoaderTest {
 				true);
 
 		// Store savepoint
-		SavepointV0 savepoint = new SavepointV0(stored.getCheckpointID(), taskStates.values());
+		SavepointV1 savepoint = new SavepointV1(stored.getCheckpointID(), taskStates.values());
 		SavepointStore store = new HeapSavepointStore();
 		String path = store.storeSavepoint(savepoint);
 
@@ -84,8 +84,8 @@ public class SavepointLoaderTest {
 		assertEquals(stored.getCheckpointID(), loaded.getCheckpointID());
 
 		// The loaded checkpoint should not discard state when its discarded
-		loaded.discard(ClassLoader.getSystemClassLoader());
-		verify(state, times(0)).discard(any(ClassLoader.class));
+		loaded.discardState();
+		verify(state, times(0)).discardState();
 
 		// 2) Load and validate: parallelism mismatch
 		when(vertex.getParallelism()).thenReturn(222);

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
deleted file mode 100644
index b656d90..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-
-import static org.junit.Assert.assertEquals;
-
-public class SavepointV0SerializerTest {
-
-	/**
-	 * Test serialization of {@link SavepointV0} instance.
-	 */
-	@Test
-	public void testSerializeDeserializeV1() throws Exception {
-		SavepointV0 expected = new SavepointV0(123123, SavepointV0Test.createTaskStates(8, 32));
-
-		SavepointV0Serializer serializer = SavepointV0Serializer.INSTANCE;
-
-		// Serialize
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		serializer.serialize(expected, new DataOutputViewStreamWrapper(baos));
-		byte[] bytes = baos.toByteArray();
-
-		// Deserialize
-		ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-		Savepoint actual = serializer.deserialize(new DataInputViewStreamWrapper(bais));
-
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
deleted file mode 100644
index 4d72c42..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.CheckpointMessagesTest;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class SavepointV0Test {
-
-	/**
-	 * Simple test of savepoint methods.
-	 */
-	@Test
-	public void testSavepointV0() throws Exception {
-		long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
-		int numTaskStates = 4;
-		int numSubtaskStates = 16;
-
-		Collection<TaskState> expected = createTaskStates(numTaskStates, numSubtaskStates);
-
-		SavepointV0 savepoint = new SavepointV0(checkpointId, expected);
-
-		assertEquals(SavepointV0.VERSION, savepoint.getVersion());
-		assertEquals(checkpointId, savepoint.getCheckpointId());
-		assertEquals(expected, savepoint.getTaskStates());
-
-		assertFalse(savepoint.getTaskStates().isEmpty());
-		savepoint.dispose(ClassLoader.getSystemClassLoader());
-		assertTrue(savepoint.getTaskStates().isEmpty());
-	}
-
-	static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtaskStates) throws IOException {
-		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
-		for (int i = 0; i < numTaskStates; i++) {
-			TaskState taskState = new TaskState(new JobVertexID(), numSubtaskStates);
-			for (int j = 0; j < numSubtaskStates; j++) {
-				SerializedValue<StateHandle<?>> stateHandle = new SerializedValue<StateHandle<?>>(
-						new CheckpointMessagesTest.MyHandle());
-
-				taskState.putState(i, new SubtaskState(stateHandle, 0, 0));
-			}
-
-			taskStates.add(taskState);
-		}
-
-		return taskStates;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
new file mode 100644
index 0000000..bad836b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+
+import static org.junit.Assert.assertEquals;
+
+public class SavepointV1SerializerTest {
+
+	/**
+	 * Test serialization of {@link SavepointV1} instance.
+	 */
+	@Test
+	public void testSerializeDeserializeV1() throws Exception {
+		SavepointV1 expected = new SavepointV1(123123, SavepointV1Test.createTaskStates(8, 32));
+
+		SavepointV1Serializer serializer = SavepointV1Serializer.INSTANCE;
+
+		// Serialize
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		serializer.serialize(expected, new DataOutputViewStreamWrapper(baos));
+		byte[] bytes = baos.toByteArray();
+
+		// Deserialize
+		ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+		Savepoint actual = serializer.deserialize(new DataInputViewStreamWrapper(bais));
+
+		assertEquals(expected, actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
new file mode 100644
index 0000000..ef10032
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SavepointV1Test {
+
+	/**
+	 * Simple test of savepoint methods.
+	 */
+	@Test
+	public void testSavepointV1() throws Exception {
+		long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
+		int numTaskStates = 4;
+		int numSubtaskStates = 16;
+
+		Collection<TaskState> expected = createTaskStates(numTaskStates, numSubtaskStates);
+
+		SavepointV1 savepoint = new SavepointV1(checkpointId, expected);
+
+		assertEquals(SavepointV1.VERSION, savepoint.getVersion());
+		assertEquals(checkpointId, savepoint.getCheckpointId());
+		assertEquals(expected, savepoint.getTaskStates());
+
+		assertFalse(savepoint.getTaskStates().isEmpty());
+		savepoint.dispose();
+		assertTrue(savepoint.getTaskStates().isEmpty());
+	}
+
+	static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtaskStates) throws IOException {
+		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+		for (int i = 0; i < numTaskStates; i++) {
+			TaskState taskState = new TaskState(new JobVertexID(), numSubtaskStates, numSubtaskStates);
+			for (int j = 0; j < numSubtaskStates; j++) {
+				StreamStateHandle stateHandle = new ByteStreamStateHandle("Hello".getBytes());
+				taskState.putState(i, new SubtaskState(
+						new ChainedStateHandle<>(Collections.singletonList(stateHandle)), 0));
+			}
+
+			taskState.putKeyedState(
+					0,
+					new KeyGroupsStateHandle(
+							new KeyGroupRangeOffsets(1,1, new long[] {42}), new ByteStreamStateHandle("Hello".getBytes())));
+
+			taskStates.add(taskState);
+		}
+
+		return taskStates;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index 12bbf82..c513e26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -19,17 +19,18 @@
 package org.apache.flink.runtime.checkpoint.stats;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -51,7 +52,7 @@ import static org.mockito.Mockito.when;
 public class SimpleCheckpointStatsTrackerTest {
 
 	private static final Random RAND = new Random();
-	
+
 	@Test
 	public void testNoCompletedCheckpointYet() throws Exception {
 		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(
@@ -154,7 +155,7 @@ public class SimpleCheckpointStatsTrackerTest {
 	private static void verifyJobStats(
 			CheckpointStatsTracker tracker,
 			int historySize,
-			CompletedCheckpoint[] checkpoints) {
+			CompletedCheckpoint[] checkpoints) throws Exception {
 
 		assertTrue(tracker.getJobStats().isDefined());
 		JobCheckpointStats jobStats = tracker.getJobStats().get();
@@ -275,14 +276,15 @@ public class SimpleCheckpointStatsTrackerTest {
 	}
 
 	private static CompletedCheckpoint[] generateRandomCheckpoints(
-			int numCheckpoints) throws IOException {
+			int numCheckpoints) throws Exception {
 
 		// Config
 		JobID jobId = new JobID();
 		int minNumOperators = 4;
 		int maxNumOperators = 32;
-		int minParallelism = 4;
-		int maxParallelism = 16;
+		int minOperatorParallelism = 4;
+		int maxOperatorParallelism = 16;
+		int maxParallelism = 32;
 
 		// Use yuge numbers here in order to test that summing up state sizes
 		// does not overflow. This was a bug in the initial version, because
@@ -299,7 +301,7 @@ public class SimpleCheckpointStatsTrackerTest {
 
 		for (int i = 0; i < numOperators; i++) {
 			operatorIds[i] = new JobVertexID();
-			operatorParallelism[i] = RAND.nextInt(maxParallelism - minParallelism + 1) + minParallelism;
+			operatorParallelism[i] = RAND.nextInt(maxOperatorParallelism - minOperatorParallelism + 1) + minOperatorParallelism;
 		}
 
 		// Generate checkpoints
@@ -317,7 +319,7 @@ public class SimpleCheckpointStatsTrackerTest {
 				JobVertexID operatorId = operatorIds[operatorIndex];
 				int parallelism = operatorParallelism[operatorIndex];
 
-				TaskState taskState = new TaskState(operatorId, parallelism);
+				TaskState taskState = new TaskState(operatorId, parallelism, maxParallelism);
 
 				taskGroupStates.put(operatorId, taskState);
 
@@ -328,9 +330,11 @@ public class SimpleCheckpointStatsTrackerTest {
 						completionDuration = duration;
 					}
 
+					final long proxySize = minStateSize + ((long) (RAND.nextDouble() * (maxStateSize - minStateSize)));
+					StreamStateHandle proxy = new StateHandleProxy(new Path(), proxySize);
+
 					SubtaskState subtaskState = new SubtaskState(
-						new SerializedValue<StateHandle<?>>(null),
-						minStateSize + ((long) (RAND.nextDouble() * (maxStateSize - minStateSize))),
+						new ChainedStateHandle<>(Arrays.asList(proxy)),
 						duration);
 
 					taskState.putState(subtaskIndex, subtaskState);
@@ -356,10 +360,32 @@ public class SimpleCheckpointStatsTrackerTest {
 			ExecutionJobVertex v = mock(ExecutionJobVertex.class);
 			when(v.getJobVertexId()).thenReturn(operatorId);
 			when(v.getParallelism()).thenReturn(parallelism);
-			
+
 			jobVertices.add(v);
 		}
 
 		return jobVertices;
 	}
+
+	private static class StateHandleProxy extends FileStateHandle {
+
+		private static final long serialVersionUID = 35356735683568L;
+
+		public StateHandleProxy(Path filePath, long proxySize) {
+			super(filePath);
+			this.proxySize = proxySize;
+		}
+
+		private long proxySize;
+
+		@Override
+		public void discardState() throws Exception {
+
+		}
+
+		@Override
+		public long getStateSize() {
+			return proxySize;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 0e1c7c5..6b80c3d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -53,8 +53,11 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
@@ -62,6 +65,7 @@ import org.apache.flink.runtime.testingUtils.TestingMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.InstantiationUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -141,38 +145,38 @@ public class JobManagerHARecoveryTest {
 			instanceManager.addInstanceListener(scheduler);
 
 			archive = system.actorOf(Props.create(
-				MemoryArchivist.class,
-				10), "archive");
+					MemoryArchivist.class,
+					10), "archive");
 
 			Props jobManagerProps = Props.create(
-				TestingJobManager.class,
-				flinkConfiguration,
-				new ForkJoinPool(),
-				instanceManager,
-				scheduler,
-				new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
-				archive,
-				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
-				timeout,
-				myLeaderElectionService,
-				mySubmittedJobGraphStore,
-				checkpointStateFactory,
-				new HeapSavepointStore(),
-				jobRecoveryTimeout,
-				Option.apply(null));
+					TestingJobManager.class,
+					flinkConfiguration,
+					new ForkJoinPool(),
+					instanceManager,
+					scheduler,
+					new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
+					archive,
+					new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
+					timeout,
+					myLeaderElectionService,
+					mySubmittedJobGraphStore,
+					checkpointStateFactory,
+					new HeapSavepointStore(),
+					jobRecoveryTimeout,
+					Option.apply(null));
 
 			jobManager = system.actorOf(jobManagerProps, "jobmanager");
 			ActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID);
 
 			taskManager = TaskManager.startTaskManagerComponentsAndActor(
-				flinkConfiguration,
-				ResourceID.generate(),
-				system,
-				"localhost",
-				Option.apply("taskmanager"),
-				Option.apply((LeaderRetrievalService) myLeaderRetrievalService),
-				true,
-				TestingTaskManager.class);
+					flinkConfiguration,
+					ResourceID.generate(),
+					system,
+					"localhost",
+					Option.apply("taskmanager"),
+					Option.apply((LeaderRetrievalService) myLeaderRetrievalService),
+					true,
+					TestingTaskManager.class);
 
 			ActorGateway tmGateway = new AkkaActorGateway(taskManager, leaderSessionID);
 
@@ -199,12 +203,12 @@ public class JobManagerHARecoveryTest {
 			BlockingStatefulInvokable.initializeStaticHelpers(slots);
 
 			Future<Object> isLeader = gateway.ask(
-				TestingJobManagerMessages.getNotifyWhenLeader(),
-				deadline.timeLeft());
+					TestingJobManagerMessages.getNotifyWhenLeader(),
+					deadline.timeLeft());
 
 			Future<Object> isConnectedToJobManager = tmGateway.ask(
-				new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager),
-				deadline.timeLeft());
+					new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager),
+					deadline.timeLeft());
 
 			// tell jobManager that he's the leader
 			myLeaderElectionService.isLeader(leaderSessionID);
@@ -216,8 +220,8 @@ public class JobManagerHARecoveryTest {
 
 			// submit blocking job
 			Future<Object> jobSubmitted = gateway.ask(
-				new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
-				deadline.timeLeft());
+					new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
+					deadline.timeLeft());
 
 			Await.ready(jobSubmitted, deadline.timeLeft());
 
@@ -298,7 +302,7 @@ public class JobManagerHARecoveryTest {
 		public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
 			checkpoints.addLast(checkpoint);
 			if (checkpoints.size() > 1) {
-				checkpoints.removeFirst().discard(ClassLoader.getSystemClassLoader());
+				checkpoints.removeFirst().discardState();
 			}
 		}
 
@@ -342,10 +346,12 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public void start() {}
+		public void start() {
+		}
 
 		@Override
-		public void stop() {}
+		public void stop() {
+		}
 
 		@Override
 		public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader) throws Exception {
@@ -408,7 +414,7 @@ public class JobManagerHARecoveryTest {
 
 		@Override
 		public void invoke() throws Exception {
-			while(blocking) {
+			while (blocking) {
 				synchronized (lock) {
 					lock.wait();
 				}
@@ -424,7 +430,7 @@ public class JobManagerHARecoveryTest {
 		}
 	}
 
-	public static class BlockingStatefulInvokable extends BlockingInvokable implements StatefulTask<StateHandle<Long>> {
+	public static class BlockingStatefulInvokable extends BlockingInvokable implements StatefulTask {
 
 		private static final int NUM_CHECKPOINTS_TO_COMPLETE = 5;
 
@@ -435,18 +441,28 @@ public class JobManagerHARecoveryTest {
 		private int completedCheckpoints = 0;
 
 		@Override
-		public void setInitialState(StateHandle<Long> stateHandle) throws Exception {
+		public void setInitialState(ChainedStateHandle<StreamStateHandle> chainedState, List<KeyGroupsStateHandle> keyGroupsState) throws Exception {
 			int subtaskIndex = getIndexInSubtaskGroup();
 			if (subtaskIndex < recoveredStates.length) {
-				recoveredStates[subtaskIndex] = stateHandle.getState(getUserCodeClassLoader());
+				recoveredStates[subtaskIndex] = InstantiationUtil.deserializeObject(chainedState.get(0).openInputStream());
 			}
 		}
 
 		@Override
 		public boolean triggerCheckpoint(long checkpointId, long timestamp) {
-			StateHandle<Long> state = new LocalStateHandle<>(checkpointId);
-			getEnvironment().acknowledgeCheckpoint(checkpointId, state);
-			return true;
+			try {
+				ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
+						InstantiationUtil.serializeObject(checkpointId));
+				RetrievableStreamStateHandle<Long> state = new RetrievableStreamStateHandle<Long>(byteStreamStateHandle);
+				ChainedStateHandle<StreamStateHandle> chainedStateHandle = new ChainedStateHandle<StreamStateHandle>(Collections.singletonList(state));
+				getEnvironment().acknowledgeCheckpoint(
+						checkpointId,
+						chainedStateHandle,
+						Collections.<KeyGroupsStateHandle>emptyList());
+				return true;
+			} catch (Exception ex) {
+				throw new RuntimeException(ex);
+			}
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 426dfba..6ef184d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -19,16 +19,17 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -36,6 +37,7 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -57,10 +59,12 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 
 	private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
 
-	private final static StateStorageHelper<SubmittedJobGraph> localStateStorage = new StateStorageHelper<SubmittedJobGraph>() {
+	private final static RetrievableStateStorageHelper<SubmittedJobGraph> localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() {
 		@Override
-		public StateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws Exception {
-			return new LocalStateHandle<>(state);
+		public RetrievableStateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws IOException {
+			ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
+					InstantiationUtil.serializeObject(state));
+			return new RetrievableStreamStateHandle<SubmittedJobGraph>(byteStreamStateHandle);
 		}
 	};
 

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 73bf204..c6eb249 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -18,32 +18,38 @@
 
 package org.apache.flink.runtime.messages;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 public class CheckpointMessagesTest {
-	
+
 	@Test
 	public void testTriggerAndConfirmCheckpoint() {
 		try {
 			NotifyCheckpointComplete cc = new NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L);
 			testSerializabilityEqualsHashCode(cc);
-			
+
 			TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);
 			testSerializabilityEqualsHashCode(tc);
-			
+
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -55,35 +61,40 @@ public class CheckpointMessagesTest {
 	public void testConfirmTaskCheckpointed() {
 		try {
 			AcknowledgeCheckpoint noState = new AcknowledgeCheckpoint(
-											new JobID(), new ExecutionAttemptID(), 569345L);
+					new JobID(), new ExecutionAttemptID(), 569345L);
+
+			KeyGroupRange keyGroupRange = KeyGroupRange.of(42,42);
 
 			AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint(
-											new JobID(), new ExecutionAttemptID(), 87658976143L, 
-											new SerializedValue<StateHandle<?>>(new MyHandle()), 0);
-			
+					new JobID(),
+					new ExecutionAttemptID(),
+					87658976143L,
+					CheckpointCoordinatorTest.generateChainedStateHandle(new MyHandle()),
+					CheckpointCoordinatorTest.generateKeyGroupState(
+							keyGroupRange, Collections.singletonList(new MyHandle())));
+
 			testSerializabilityEqualsHashCode(noState);
 			testSerializabilityEqualsHashCode(withState);
-		}
-		catch (Exception e) {
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
-	
+
 	private static void testSerializabilityEqualsHashCode(Serializable o) throws IOException {
 		Object copy = CommonTestUtils.createCopySerializable(o);
+		System.out.println(o.getClass() +" "+copy.getClass());
 		assertEquals(o, copy);
 		assertEquals(o.hashCode(), copy.hashCode());
 		assertNotNull(o.toString());
 		assertNotNull(copy.toString());
 	}
-	
-	public static class MyHandle implements StateHandle<Serializable> {
+
+	public static class MyHandle implements StreamStateHandle {
 
 		private static final long serialVersionUID = 8128146204128728332L;
 
-		@Override
-		public Serializable getState(ClassLoader userCodeClassLoader) {
+		public Serializable get(ClassLoader userCodeClassLoader) {
 			return null;
 		}
 
@@ -107,5 +118,10 @@ public class CheckpointMessagesTest {
 
 		@Override
 		public void close() throws IOException {}
+
+		@Override
+		public FSDataInputStream openInputStream() throws Exception {
+			return null;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 87540bc..19317f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -36,10 +36,13 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -149,10 +152,15 @@ public class DummyEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId) {}
+	public void acknowledgeCheckpoint(long checkpointId) {
+
+	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {}
+	public void acknowledgeCheckpoint(long checkpointId,
+			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+			List<KeyGroupsStateHandle> keyGroupStateHandles) {
+	}
 
 	@Override
 	public void failExternally(Throwable cause) {

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 7b966c3..2c76399 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -45,7 +45,10 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
@@ -96,9 +99,13 @@ public class MockEnvironment implements Environment {
 	private final int bufferSize;
 
 	public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+		this(taskName, memorySize, inputSplitProvider, bufferSize, new Configuration());
+	}
+
+	public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize, Configuration taskConfiguration) {
 		this.taskInfo = new TaskInfo(taskName, 0, 1, 0);
 		this.jobConfiguration = new Configuration();
-		this.taskConfiguration = new Configuration();
+		this.taskConfiguration = taskConfiguration;
 		this.inputs = new LinkedList<InputGate>();
 		this.outputs = new LinkedList<ResultPartitionWriter>();
 
@@ -298,7 +305,9 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
+	public void acknowledgeCheckpoint(long checkpointId,
+			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+			List<KeyGroupsStateHandle> keyGroupStateHandles) {
 		throw new UnsupportedOperationException();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
index ad3339a..40e1852 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
@@ -85,5 +85,15 @@ public class AbstractCloseableHandleTest {
 
 	private static final class CloseableHandle extends AbstractCloseableHandle {
 		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void discardState() throws Exception {
+
+		}
+
+		@Override
+		public long getStateSize() throws Exception {
+			return 0;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 0f1c0f7..04fa089 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -24,7 +24,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
+
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
@@ -86,7 +87,12 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 
 			// no file operations should be possible right now
 			try {
-				backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis());
+				FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(
+						2L,
+						System.currentTimeMillis());
+
+				out.write(1);
+				out.closeAndGetHandle();
 				fail("should fail with an exception");
 			} catch (IllegalStateException e) {
 				// supreme!
@@ -114,43 +120,6 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 	}
 
 	@Test
-	public void testSerializableState() {
-		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
-		try {
-			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
-			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test-op", IntSerializer.INSTANCE);
-
-			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
-
-			String state1 = "dummy state";
-			String state2 = "row row row your boat";
-			Integer state3 = 42;
-
-			StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis());
-			StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis());
-			StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis());
-
-			assertEquals(state1, handle1.getState(getClass().getClassLoader()));
-			handle1.discardState();
-
-			assertEquals(state2, handle2.getState(getClass().getClassLoader()));
-			handle2.discardState();
-
-			assertEquals(state3, handle3.getState(getClass().getClassLoader()));
-			handle3.discardState();
-
-			assertTrue(isDirectoryEmpty(checkpointDir));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			deleteDirectorySilently(tempDir);
-		}
-	}
-
-	@Test
 	public void testStateOutputStream() {
 		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
 		try {
@@ -185,16 +154,16 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 			stream2.write(state2);
 			stream3.write(state3);
 
-			FileStreamStateHandle handle1 = (FileStreamStateHandle) stream1.closeAndGetHandle();
+			FileStateHandle handle1 = (FileStateHandle) stream1.closeAndGetHandle();
 			ByteStreamStateHandle handle2 = (ByteStreamStateHandle) stream2.closeAndGetHandle();
 			ByteStreamStateHandle handle3 = (ByteStreamStateHandle) stream3.closeAndGetHandle();
 
 			// use with try-with-resources
-			FileStreamStateHandle handle4;
+			StreamStateHandle handle4;
 			try (AbstractStateBackend.CheckpointStateOutputStream stream4 =
 					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) {
 				stream4.write(state4);
-				handle4 = (FileStreamStateHandle) stream4.closeAndGetHandle();
+				handle4 = stream4.closeAndGetHandle();
 			}
 
 			// close before accessing handle
@@ -209,18 +178,18 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 				// uh-huh
 			}
 
-			validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
+			validateBytesInStream(handle1.openInputStream(), state1);
 			handle1.discardState();
 			assertFalse(isDirectoryEmpty(checkpointDir));
 			ensureLocalFileDeleted(handle1.getFilePath());
 
-			validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
+			validateBytesInStream(handle2.openInputStream(), state2);
 			handle2.discardState();
 
-			validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
+			validateBytesInStream(handle3.openInputStream(), state3);
 			handle3.discardState();
 
-			validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
+			validateBytesInStream(handle4.openInputStream(), state4);
 			handle4.discardState();
 			assertTrue(isDirectoryEmpty(checkpointDir));
 		}