You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/07 13:06:29 UTC

[1/3] flink git commit: [FLINK-5041] Savepoint backwards compatibility 1.1 -> 1.2

Repository: flink
Updated Branches:
  refs/heads/master a6e80da30 -> af3bf837a


http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
new file mode 100644
index 0000000..02365c7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0;
+import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.memory.MemValueState;
+import org.apache.flink.migration.runtime.state.memory.SerializedStateHandle;
+import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
+import org.apache.flink.migration.util.MigrationInstantiationUtil;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class MigrationV0ToV1Test {
+
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
+	/**
+	 * Simple test of savepoint methods.
+	 */
+	@Test
+	public void testSavepointMigrationV0ToV1() throws Exception {
+
+		String target = tmp.getRoot().getAbsolutePath();
+
+		assertEquals(0, tmp.getRoot().listFiles().length);
+
+		long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
+		int numTaskStates = 4;
+		int numSubtaskStates = 16;
+
+		Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> expected =
+				createTaskStatesOld(numTaskStates, numSubtaskStates);
+
+		SavepointV0 savepoint = new SavepointV0(checkpointId, expected);
+
+		assertEquals(SavepointV0.VERSION, savepoint.getVersion());
+		assertEquals(checkpointId, savepoint.getCheckpointId());
+		assertEquals(expected, savepoint.getOldTaskStates());
+
+		assertFalse(savepoint.getOldTaskStates().isEmpty());
+
+		Exception latestException = null;
+		Path path = null;
+		FSDataOutputStream fdos = null;
+
+		FileSystem fs = null;
+
+		try {
+
+			// Try to create a FS output stream
+			for (int attempt = 0; attempt < 10; attempt++) {
+				path = new Path(target, FileUtils.getRandomFilename("savepoint-"));
+
+				if (fs == null) {
+					fs = FileSystem.get(path.toUri());
+				}
+
+				try {
+					fdos = fs.create(path, false);
+					break;
+				} catch (Exception e) {
+					latestException = e;
+				}
+			}
+
+			if (fdos == null) {
+				throw new IOException("Failed to create file output stream at " + path, latestException);
+			}
+
+			try (DataOutputStream dos = new DataOutputStream(fdos)) {
+				dos.writeInt(SavepointStore.MAGIC_NUMBER);
+				dos.writeInt(savepoint.getVersion());
+				SavepointV0Serializer.INSTANCE.serializeOld(savepoint, dos);
+			}
+
+			ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+			Savepoint sp = SavepointStore.loadSavepoint(path.toString(), cl);
+			int t = 0;
+			for (TaskState taskState : sp.getTaskStates()) {
+				for (int p = 0; p < taskState.getParallelism(); ++p) {
+					SubtaskState subtaskState = taskState.getState(p);
+					ChainedStateHandle<StreamStateHandle> legacyOperatorState = subtaskState.getLegacyOperatorState();
+					for (int c = 0; c < legacyOperatorState.getLength(); ++c) {
+						StreamStateHandle stateHandle = legacyOperatorState.get(c);
+						try (InputStream is = stateHandle.openInputStream()) {
+							Tuple4<Integer, Integer, Integer, Integer> expTestState = new Tuple4<>(0, t, p, c);
+							Tuple4<Integer, Integer, Integer, Integer> actTestState = null;
+							//check function state
+							if (p % 4 != 0) {
+								assertEquals(1, is.read());
+								actTestState = InstantiationUtil.deserializeObject(is, cl);
+								assertEquals(expTestState, actTestState);
+							} else {
+								assertEquals(0, is.read());
+							}
+
+							//check operator state
+							expTestState.f0 = 1;
+							if (p % 3 != 0) {
+								assertEquals(1, is.read());
+								actTestState = InstantiationUtil.deserializeObject(is, cl);
+								assertEquals(expTestState, actTestState);
+							} else {
+								assertEquals(0, is.read());
+							}
+						}
+					}
+
+					//check keyed state
+					KeyGroupsStateHandle keyGroupsStateHandle = subtaskState.getManagedKeyedState();
+					if (t % 3 != 0) {
+						assertEquals(1, keyGroupsStateHandle.getNumberOfKeyGroups());
+						assertEquals(p, keyGroupsStateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup());
+
+						ByteStreamStateHandle stateHandle =
+								(ByteStreamStateHandle) keyGroupsStateHandle.getDelegateStateHandle();
+						HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState =
+								MigrationInstantiationUtil.deserializeObject(stateHandle.getData(), cl);
+
+						assertEquals(2, testKeyedState.size());
+						for (KvStateSnapshot<?, ?, ?, ?> snapshot : testKeyedState.values()) {
+							MemValueState.Snapshot<?, ?, ?> castedSnapshot = (MemValueState.Snapshot<?, ?, ?>) snapshot;
+							byte[] data = castedSnapshot.getData();
+							assertEquals(t, data[0]);
+							assertEquals(p, data[1]);
+						}
+					} else {
+						assertEquals(null, keyGroupsStateHandle);
+					}
+				}
+
+				++t;
+			}
+
+			savepoint.dispose();
+
+		} finally {
+			// Dispose
+			SavepointStore.removeSavepoint(path.toString());
+		}
+	}
+
+	private static Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> createTaskStatesOld(
+			int numTaskStates, int numSubtaskStates) throws Exception {
+
+		List<org.apache.flink.migration.runtime.checkpoint.TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+		for (int i = 0; i < numTaskStates; i++) {
+			org.apache.flink.migration.runtime.checkpoint.TaskState taskState =
+					new org.apache.flink.migration.runtime.checkpoint.TaskState(new JobVertexID(), numSubtaskStates);
+			for (int j = 0; j < numSubtaskStates; j++) {
+
+				StreamTaskState[] streamTaskStates = new StreamTaskState[2];
+
+				for (int k = 0; k < streamTaskStates.length; k++) {
+					StreamTaskState state = new StreamTaskState();
+					Tuple4<Integer, Integer, Integer, Integer> testState = new Tuple4<>(0, i, j, k);
+					if (j % 4 != 0) {
+						state.setFunctionState(new SerializedStateHandle<Serializable>(testState));
+					}
+					testState = new Tuple4<>(1, i, j, k);
+					if (j % 3 != 0) {
+						state.setOperatorState(new SerializedStateHandle<>(testState));
+					}
+
+					if ((0 == k) && (i % 3 != 0)) {
+						HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState = new HashMap<>(2);
+						for (int l = 0; l < 2; ++l) {
+							String name = "keyed-" + l;
+							KvStateSnapshot<?, ?, ?, ?> testKeyedSnapshot =
+									new MemValueState.Snapshot<>(
+											IntSerializer.INSTANCE,
+											VoidNamespaceSerializer.INSTANCE,
+											IntSerializer.INSTANCE,
+											new ValueStateDescriptor<>(name, Integer.class, 0),
+											new byte[]{(byte) i, (byte) j});
+							testKeyedState.put(name, testKeyedSnapshot);
+						}
+						state.setKvStates(testKeyedState);
+					}
+					streamTaskStates[k] = state;
+				}
+
+				StreamTaskStateList streamTaskStateList = new StreamTaskStateList(streamTaskStates);
+				org.apache.flink.migration.util.SerializedValue<
+						org.apache.flink.migration.runtime.state.StateHandle<?>> handle =
+						new org.apache.flink.migration.util.SerializedValue<
+								org.apache.flink.migration.runtime.state.StateHandle<?>>(streamTaskStateList);
+
+				taskState.putState(j, new org.apache.flink.migration.runtime.checkpoint.SubtaskState(handle, 0, 0));
+			}
+
+			taskStates.add(taskState);
+		}
+
+		return taskStates;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index e1b83f4..67575d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -77,8 +77,10 @@ public class SavepointLoaderTest {
 		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
 		tasks.put(vertexId, vertex);
 
+		ClassLoader ucl = Thread.currentThread().getContextClassLoader();
+
 		// 1) Load and validate: everything correct
-		CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false);
+		CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);
 
 		assertEquals(jobId, loaded.getJobId());
 		assertEquals(checkpointId, loaded.getCheckpointID());
@@ -87,7 +89,7 @@ public class SavepointLoaderTest {
 		when(vertex.getMaxParallelism()).thenReturn(222);
 
 		try {
-			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false);
+			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);
 			fail("Did not throw expected Exception");
 		} catch (IllegalStateException expected) {
 			assertTrue(expected.getMessage().contains("Max parallelism mismatch"));
@@ -97,13 +99,13 @@ public class SavepointLoaderTest {
 		assertNotNull(tasks.remove(vertexId));
 
 		try {
-			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false);
+			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);
 			fail("Did not throw expected Exception");
 		} catch (IllegalStateException expected) {
 			assertTrue(expected.getMessage().contains("allowNonRestoredState"));
 		}
 
 		// 4) Load and validate: ignore missing vertex
-		SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, true);
+		SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, true);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
index 8eed6ea..3398341 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -64,7 +64,7 @@ public class SavepointStoreTest {
 		assertEquals(1, tmp.getRoot().listFiles().length);
 
 		// Load
-		Savepoint loaded = SavepointStore.loadSavepoint(path);
+		Savepoint loaded = SavepointStore.loadSavepoint(path, Thread.currentThread().getContextClassLoader());
 		assertEquals(stored, loaded);
 
 		loaded.dispose();
@@ -89,7 +89,7 @@ public class SavepointStoreTest {
 		}
 
 		try {
-			SavepointStore.loadSavepoint(filePath.toString());
+			SavepointStore.loadSavepoint(filePath.toString(), Thread.currentThread().getContextClassLoader());
 			fail("Did not throw expected Exception");
 		} catch (RuntimeException e) {
 			assertTrue(e.getMessage().contains("Flink 1.0") && e.getMessage().contains("Unexpected magic number"));
@@ -128,10 +128,10 @@ public class SavepointStoreTest {
 		assertEquals(2, tmp.getRoot().listFiles().length);
 
 		// Load
-		Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint);
+		Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint, Thread.currentThread().getContextClassLoader());
 		assertEquals(newSavepoint, loaded);
 
-		loaded = SavepointStore.loadSavepoint(pathSavepoint);
+		loaded = SavepointStore.loadSavepoint(pathSavepoint, Thread.currentThread().getContextClassLoader());
 		assertEquals(savepoint, loaded);
 	}
 
@@ -176,7 +176,7 @@ public class SavepointStoreTest {
 		}
 
 		@Override
-		public TestSavepoint deserialize(DataInputStream dis) throws IOException {
+		public TestSavepoint deserialize(DataInputStream dis, ClassLoader userCL) throws IOException {
 			int version = dis.readInt();
 			long checkpointId = dis.readLong();
 			return new TestSavepoint(version, checkpointId);

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
index 508a69d..58cf1aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
-import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -50,7 +49,9 @@ public class SavepointV1SerializerTest {
 
 			// Deserialize
 			ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-			Savepoint actual = serializer.deserialize(new DataInputViewStreamWrapper(bais));
+			Savepoint actual = serializer.deserialize(
+					new DataInputViewStreamWrapper(bais),
+					Thread.currentThread().getContextClassLoader());
 
 			assertEquals(expected, actual);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java
new file mode 100644
index 0000000..dd34f03
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class MultiStreamStateHandleTest {
+
+	private static final int TEST_DATA_LENGTH = 123;
+	private Random random;
+	private byte[] testData;
+	private List<StreamStateHandle> streamStateHandles;
+
+	@Before
+	public void setup() {
+		random = new Random(0x42);
+		testData = new byte[TEST_DATA_LENGTH];
+		for (int i = 0; i < testData.length; ++i) {
+			testData[i] = (byte) i;
+		}
+
+		int idx = 0;
+		streamStateHandles = new ArrayList<>();
+		while (idx < testData.length) {
+			int len = random.nextInt(5);
+			byte[] sub = Arrays.copyOfRange(testData, idx, idx + len);
+			streamStateHandles.add(new ByteStreamStateHandle(String.valueOf(idx), sub));
+			idx += len;
+		}
+	}
+
+	@Test
+	public void testMetaData() throws IOException {
+		MultiStreamStateHandle multiStreamStateHandle = new MultiStreamStateHandle(streamStateHandles);
+		assertEquals(TEST_DATA_LENGTH, multiStreamStateHandle.getStateSize());
+	}
+
+	@Test
+	public void testLinearRead() throws IOException {
+		MultiStreamStateHandle multiStreamStateHandle = new MultiStreamStateHandle(streamStateHandles);
+		try (FSDataInputStream in = multiStreamStateHandle.openInputStream()) {
+
+			for (int i = 0; i < TEST_DATA_LENGTH; ++i) {
+				assertEquals(i, in.getPos());
+				assertEquals(testData[i], in.read());
+			}
+
+			assertEquals(-1, in.read());
+			assertEquals(TEST_DATA_LENGTH, in.getPos());
+			assertEquals(-1, in.read());
+			assertEquals(TEST_DATA_LENGTH, in.getPos());
+		}
+	}
+
+	@Test
+	public void testRandomRead() throws IOException {
+
+		MultiStreamStateHandle multiStreamStateHandle = new MultiStreamStateHandle(streamStateHandles);
+
+		try (FSDataInputStream in = multiStreamStateHandle.openInputStream()) {
+
+			for (int i = 0; i < 1000; ++i) {
+				int pos = random.nextInt(TEST_DATA_LENGTH);
+				int readLen = random.nextInt(TEST_DATA_LENGTH);
+				in.seek(pos);
+				while (--readLen > 0 && pos < TEST_DATA_LENGTH) {
+					assertEquals(pos, in.getPos());
+					assertEquals(testData[pos++], in.read());
+				}
+			}
+
+			in.seek(TEST_DATA_LENGTH);
+			assertEquals(TEST_DATA_LENGTH, in.getPos());
+			assertEquals(-1, in.read());
+
+			try {
+				in.seek(TEST_DATA_LENGTH + 1);
+				fail();
+			} catch (Exception ignored) {
+
+			}
+		}
+	}
+
+	@Test
+	public void testEmptyList() throws IOException {
+
+		MultiStreamStateHandle multiStreamStateHandle =
+				new MultiStreamStateHandle(Collections.<StreamStateHandle>emptyList());
+
+		try (FSDataInputStream in = multiStreamStateHandle.openInputStream()) {
+
+			assertEquals(0, in.getPos());
+			in.seek(0);
+			assertEquals(0, in.getPos());
+			assertEquals(-1, in.read());
+
+			try {
+				in.seek(1);
+				fail();
+			} catch (Exception ignored) {
+
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 68a50d3..a3d31f5 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -310,7 +310,11 @@ trait TestingJobManagerLike extends FlinkActor {
 
     case RequestSavepoint(savepointPath) =>
       try {
-        val savepoint = SavepointStore.loadSavepoint(savepointPath)
+        //TODO user class loader ?
+        val savepoint = SavepointStore.loadSavepoint(
+          savepointPath,
+          Thread.currentThread().getContextClassLoader)
+        
         sender ! ResponseSavepoint(savepoint)
       }
       catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index fac37c2..54f6c10 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -640,7 +640,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		for (int chainIdx = 0; chainIdx < allOperators.length; ++chainIdx) {
 			StreamOperator<?> operator = allOperators[chainIdx];
 			if (null != operator) {
-				if (restored) {
+				if (restored && restoreStateHandles != null) {
 					operator.initializeState(new OperatorStateHandles(restoreStateHandles, chainIdx));
 				} else {
 					operator.initializeState(null);


[3/3] flink git commit: [FLINK-5041] Savepoint backwards compatibility 1.1 -> 1.2

Posted by uc...@apache.org.
[FLINK-5041] Savepoint backwards compatibility 1.1 -> 1.2

This addresses Savepoint, TaskState, StateHandels, KeyedStateBackends.

This closes #2781.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af3bf837
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af3bf837
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af3bf837

Branch: refs/heads/master
Commit: af3bf837af766f93b1ab21e8e2f6f8fc5fdac6a6
Parents: a6e80da
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Nov 1 12:29:01 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Dec 7 14:05:12 2016 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  96 ++++-
 .../streaming/state/RocksDBStateBackend.java    |  67 +++
 flink-core/pom.xml                              |   2 +-
 .../core/fs/AbstractMultiFSDataInputStream.java | 113 ++++++
 .../memory/ByteArrayInputStreamWithPos.java     | 118 ++++++
 .../memory/ByteArrayOutputStreamWithPos.java    | 220 ++--------
 .../util/MigrationInstantiationUtil.java        |  82 ++++
 .../flink/migration/util/SerializedValue.java   |  95 +++++
 .../apache/flink/util/InstantiationUtil.java    |   2 +-
 .../apache/flink/migration/MigrationUtil.java   |  34 ++
 .../runtime/checkpoint/KeyGroupState.java       |  84 ++++
 .../runtime/checkpoint/SubtaskState.java        | 104 +++++
 .../migration/runtime/checkpoint/TaskState.java | 160 ++++++++
 .../checkpoint/savepoint/SavepointV0.java       |  98 +++++
 .../savepoint/SavepointV0Serializer.java        | 404 +++++++++++++++++++
 .../runtime/state/AbstractCloseableHandle.java  | 128 ++++++
 .../runtime/state/AbstractStateBackend.java     |  68 ++++
 .../runtime/state/KvStateSnapshot.java          |  28 ++
 .../migration/runtime/state/StateHandle.java    |  38 ++
 .../migration/runtime/state/StateObject.java    |  55 +++
 .../runtime/state/StreamStateHandle.java        |  37 ++
 .../filesystem/AbstractFileStateHandle.java     |  99 +++++
 .../filesystem/AbstractFsStateSnapshot.java     |  87 ++++
 .../filesystem/FileSerializableStateHandle.java |  73 ++++
 .../state/filesystem/FileStreamStateHandle.java |  84 ++++
 .../state/filesystem/FsFoldingState.java        |  40 ++
 .../runtime/state/filesystem/FsListState.java   |  42 ++
 .../state/filesystem/FsReducingState.java       |  40 ++
 .../state/filesystem/FsStateBackend.java        |  50 +++
 .../runtime/state/filesystem/FsValueState.java  |  40 ++
 .../state/memory/AbstractMemStateSnapshot.java  | 136 +++++++
 .../state/memory/ByteStreamStateHandle.java     |  85 ++++
 .../runtime/state/memory/MemFoldingState.java   |  38 ++
 .../runtime/state/memory/MemListState.java      |  41 ++
 .../runtime/state/memory/MemReducingState.java  |  45 +++
 .../runtime/state/memory/MemValueState.java     |  45 +++
 .../state/memory/SerializedStateHandle.java     |  92 +++++
 .../state/MigrationKeyGroupStateHandle.java     |  43 ++
 .../state/MigrationStreamStateHandle.java       |  56 +++
 .../runtime/tasks/StreamTaskState.java          |  85 ++++
 .../runtime/tasks/StreamTaskStateList.java      |  96 +++++
 .../checkpoint/savepoint/SavepointLoader.java   |   4 +-
 .../savepoint/SavepointSerializer.java          |   3 +-
 .../savepoint/SavepointSerializers.java         |   5 +-
 .../checkpoint/savepoint/SavepointStore.java    |   6 +-
 .../savepoint/SavepointV1Serializer.java        |   6 +-
 .../state/AbstractKeyedStateBackend.java        |   2 +-
 .../runtime/state/MultiStreamStateHandle.java   |  96 +++++
 .../state/heap/HeapKeyedStateBackend.java       | 143 ++++++-
 .../flink/runtime/jobmanager/JobManager.scala   |  18 +-
 .../savepoint/MigrationV0ToV1Test.java          | 249 ++++++++++++
 .../savepoint/SavepointLoaderTest.java          |  10 +-
 .../savepoint/SavepointStoreTest.java           |  10 +-
 .../savepoint/SavepointV1SerializerTest.java    |   5 +-
 .../state/MultiStreamStateHandleTest.java       | 135 +++++++
 .../testingUtils/TestingJobManagerLike.scala    |   6 +-
 .../streaming/runtime/tasks/StreamTask.java     |   2 +-
 57 files changed, 3823 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index bc5b17d..4db622d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -31,11 +31,15 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.migration.MigrationUtil;
+import org.apache.flink.migration.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -43,6 +47,7 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -62,8 +67,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -205,8 +212,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		try {
-			RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this);
-			restoreOperation.doRestore(restoreState);
+			if (MigrationUtil.isOldSavepointKeyedState(restoreState)) {
+				LOG.info("Converting RocksDB state from old savepoint.");
+				restoreOldSavepointKeyedState(restoreState);
+			} else {
+				RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this);
+				restoreOperation.doRestore(restoreState);
+			}
 		} catch (Exception ex) {
 			dispose();
 			throw ex;
@@ -1068,4 +1080,84 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	public File getInstanceBasePath() {
 		return instanceBasePath;
 	}
+
+	/**
+	 * For backwards compatibility, remove again later!
+	 */
+	@Deprecated
+	private void restoreOldSavepointKeyedState(Collection<KeyGroupsStateHandle> restoreState) throws Exception {
+
+		if (restoreState.isEmpty()) {
+			return;
+		}
+
+		Preconditions.checkState(1 == restoreState.size(), "Only one element expected here.");
+		HashMap<String, RocksDBStateBackend.FinalFullyAsyncSnapshot> namedStates =
+				InstantiationUtil.deserializeObject(restoreState.iterator().next().openInputStream(), userCodeClassLoader);
+
+		Preconditions.checkState(1 == namedStates.size(), "Only one element expected here.");
+		DataInputView inputView = namedStates.values().iterator().next().stateHandle.getState(userCodeClassLoader);
+
+		// clear k/v state information before filling it
+		kvStateInformation.clear();
+
+		// first get the column family mapping
+		int numColumns = inputView.readInt();
+		Map<Byte, StateDescriptor> columnFamilyMapping = new HashMap<>(numColumns);
+		for (int i = 0; i < numColumns; i++) {
+			byte mappingByte = inputView.readByte();
+
+			ObjectInputStream ooIn =
+					new InstantiationUtil.ClassLoaderObjectInputStream(
+							new DataInputViewStream(inputView), userCodeClassLoader);
+
+			StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject();
+
+			columnFamilyMapping.put(mappingByte, stateDescriptor);
+
+			// this will fill in the k/v state information
+			getColumnFamily(stateDescriptor);
+		}
+
+		// try and read until EOF
+		try {
+			// the EOFException will get us out of this...
+			while (true) {
+				byte mappingByte = inputView.readByte();
+				ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte));
+				byte[] keyAndNamespace = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
+
+				ByteArrayInputStreamWithPos bis = new ByteArrayInputStreamWithPos(keyAndNamespace);
+
+				K reconstructedKey = keySerializer.deserialize(new DataInputViewStreamWrapper(bis));
+				int len = bis.getPosition();
+
+				int keyGroup = (byte) KeyGroupRangeAssignment.assignToKeyGroup(reconstructedKey, numberOfKeyGroups);
+
+				if (keyGroupPrefixBytes == 1) {
+					// copy and override one byte (42) between key and namespace
+					System.arraycopy(keyAndNamespace, 0, keyAndNamespace, 1, len);
+					keyAndNamespace[0] = (byte) keyGroup;
+				} else {
+					byte[] largerKey = new byte[1 + keyAndNamespace.length];
+
+					// write key-group
+					largerKey[0] = (byte) ((keyGroup >> 8) & 0xFF);
+					largerKey[1] = (byte) (keyGroup & 0xFF);
+
+					// write key
+					System.arraycopy(keyAndNamespace, 0, largerKey, 2, len);
+
+					//skip one byte (42), write namespace
+					System.arraycopy(keyAndNamespace, 1 + len, largerKey, 2 + len, keyAndNamespace.length - len - 1);
+					keyAndNamespace = largerKey;
+				}
+
+				byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
+				db.put(handle, keyAndNamespace, value);
+			}
+		} catch (EOFException e) {
+			// expected
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
new file mode 100644
index 0000000..509eb4c
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.migration.runtime.state.AbstractStateBackend;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.StateHandle;
+
+import java.io.IOException;
+
+import static java.util.Objects.requireNonNull;
+
+@Deprecated
+public class RocksDBStateBackend extends AbstractStateBackend {
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Dummy {@link KvStateSnapshot} that holds the state of our one RocksDB data base.
+	 */
+	public static class FinalFullyAsyncSnapshot implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>> {
+		private static final long serialVersionUID = 1L;
+
+		public final StateHandle<DataInputView> stateHandle;
+		final long checkpointId;
+
+		/**
+		 * Creates a new snapshot from the given state parameters.
+		 */
+		private FinalFullyAsyncSnapshot(StateHandle<DataInputView> stateHandle, long checkpointId) {
+			this.stateHandle = requireNonNull(stateHandle);
+			this.checkpointId = checkpointId;
+		}
+
+		@Override
+		public final void discardState() throws Exception {
+			stateHandle.discardState();
+		}
+
+		@Override
+		public final long getStateSize() throws Exception {
+			return stateHandle.getStateSize();
+		}
+
+		@Override
+		public void close() throws IOException {
+			stateHandle.close();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index cfa2cbb..ffbfe70 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -112,7 +112,7 @@ under the License.
 			<artifactId>joda-convert</artifactId>
 			<scope>test</scope>
 		</dependency>
-	</dependencies>
+    </dependencies>
 
 	<build>
 		<plugins>

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
new file mode 100644
index 0000000..88c0092
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * Abstract base class for wrappers over multiple {@link FSDataInputStream}, which gives a contiguous view on all inner
+ * streams and makes them look like a single stream, in which we can read, seek, etc.
+ */
+public abstract class AbstractMultiFSDataInputStream extends FSDataInputStream {
+
+	/** Inner stream for the currently accessed segment of the virtual global stream */
+	protected FSDataInputStream delegate;
+
+	/** Position in the virtual global stream */
+	protected long totalPos;
+
+	/** Total available bytes in the virtual global stream */
+	protected long totalAvailable;
+
+	public AbstractMultiFSDataInputStream() {
+		this.totalPos = 0L;
+	}
+
+	@Override
+	public void seek(long desired) throws IOException {
+
+		if(desired == totalPos) {
+			return;
+		}
+
+		Preconditions.checkArgument(desired >= 0L);
+
+		if (desired > totalAvailable) {
+			throw new EOFException();
+		}
+
+		IOUtils.closeQuietly(delegate);
+		delegate = getSeekedStreamForOffset(desired);
+
+		this.totalPos = desired;
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return totalPos;
+	}
+
+	@Override
+	public int read() throws IOException {
+
+		if (null == delegate) {
+			return -1;
+		}
+
+		int val = delegate.read();
+
+		if (-1 == val) {
+			IOUtils.closeQuietly(delegate);
+			if (totalPos < totalAvailable) {
+				delegate = getSeekedStreamForOffset(totalPos);
+			} else {
+				delegate = null;
+			}
+			return read();
+		}
+
+		++totalPos;
+		return val;
+	}
+
+	@Override
+	public void close() throws IOException {
+		IOUtils.closeQuietly(delegate);
+	}
+
+	@Override
+	public long skip(long n) throws IOException {
+		seek(totalPos + n);
+		return n;
+	}
+
+	/**
+	 * Delivers a the right stream for the given global stream offset. The returned stream is already seeked to the
+	 * right local offset that correctly reflects the global offset.
+	 *
+	 * @param globalStreamOffset the global offset to which we seek
+	 * @return a sub-stream, seeked to the correct local offset w.r.t. the global offset.
+	 * @throws IOException
+	 */
+	protected abstract FSDataInputStream getSeekedStreamForOffset(long globalStreamOffset) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
new file mode 100644
index 0000000..c25f491
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Un-synchronized stream similar to Java's ByteArrayInputStream that also exposes the current position.
+ */
+public class ByteArrayInputStreamWithPos extends InputStream {
+
+	protected byte[] buffer;
+	protected int position;
+	protected int count;
+	protected int mark = 0;
+
+	public ByteArrayInputStreamWithPos(byte[] buffer) {
+		this(buffer, 0, buffer.length);
+	}
+
+	public ByteArrayInputStreamWithPos(byte[] buffer, int offset, int length) {
+		this.position = offset;
+		this.buffer = buffer;
+		this.mark = offset;
+		this.count = Math.min(buffer.length, offset + length);
+	}
+
+	@Override
+	public int read() {
+		return (position < count) ? 0xFF & (buffer[position++]) : -1;
+	}
+
+	@Override
+	public int read(byte[] b, int off, int len) {
+		Preconditions.checkNotNull(b);
+
+		if (off < 0 || len < 0 || len > b.length - off) {
+			throw new IndexOutOfBoundsException();
+		}
+
+		if (position >= count) {
+			return -1; // signal EOF
+		}
+
+		int available = count - position;
+
+		if (len > available) {
+			len = available;
+		}
+
+		if (len <= 0) {
+			return 0;
+		}
+
+		System.arraycopy(buffer, position, b, off, len);
+		position += len;
+		return len;
+	}
+
+	@Override
+	public long skip(long toSkip) {
+		long remain = count - position;
+
+		if (toSkip < remain) {
+			remain = toSkip < 0 ? 0 : toSkip;
+		}
+
+		position += remain;
+		return remain;
+	}
+
+	@Override
+	public boolean markSupported() {
+		return true;
+	}
+
+	@Override
+	public void mark(int readAheadLimit) {
+		mark = position;
+	}
+
+	@Override
+	public void reset() {
+		position = mark;
+	}
+
+	@Override
+	public int available() {
+		return count - position;
+	}
+
+	@Override
+	public void close() throws IOException {
+	}
+
+	public int getPosition() {
+		return position;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index 285e016..df5b34a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -22,260 +22,96 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 
 /**
- * Un-synchronized copy of Java's ByteArrayOutputStream that also exposes the current position.
+ * Un-synchronized stream similar to Java's ByteArrayOutputStream that also exposes the current position.
  */
 public class ByteArrayOutputStreamWithPos extends OutputStream {
 
-	/**
-	 * The buffer where data is stored.
-	 */
-	protected byte[] buf;
-
-	/**
-	 * The number of valid bytes in the buffer.
-	 */
+	protected byte[] buffer;
 	protected int count;
 
-	/**
-	 * Creates a new byte array output stream. The buffer capacity is
-	 * initially 32 bytes, though its size increases if necessary.
-	 */
 	public ByteArrayOutputStreamWithPos() {
-		this(32);
+		this(64);
 	}
 
-	/**
-	 * Creates a new byte array output stream, with a buffer capacity of
-	 * the specified size, in bytes.
-	 *
-	 * @param size the initial size.
-	 * @throws IllegalArgumentException if size is negative.
-	 */
 	public ByteArrayOutputStreamWithPos(int size) {
-		if (size < 0) {
-			throw new IllegalArgumentException("Negative initial size: "
-					+ size);
-		}
-		buf = new byte[size];
+		Preconditions.checkArgument(size >= 0);
+		buffer = new byte[size];
 	}
 
-	/**
-	 * Increases the capacity if necessary to ensure that it can hold
-	 * at least the number of elements specified by the minimum
-	 * capacity argument.
-	 *
-	 * @param minCapacity the desired minimum capacity
-	 * @throws OutOfMemoryError if {@code minCapacity < 0}.  This is
-	 *                          interpreted as a request for the unsatisfiably large capacity
-	 *                          {@code (long) Integer.MAX_VALUE + (minCapacity - Integer.MAX_VALUE)}.
-	 */
-	private void ensureCapacity(int minCapacity) {
-		// overflow-conscious code
-		if (minCapacity - buf.length > 0) {
-			grow(minCapacity);
+	private void ensureCapacity(int requiredCapacity) {
+		if (requiredCapacity - buffer.length > 0) {
+			increaseCapacity(requiredCapacity);
 		}
 	}
 
-	/**
-	 * Increases the capacity to ensure that it can hold at least the
-	 * number of elements specified by the minimum capacity argument.
-	 *
-	 * @param minCapacity the desired minimum capacity
-	 */
-	private void grow(int minCapacity) {
-		// overflow-conscious code
-		int oldCapacity = buf.length;
+	private void increaseCapacity(int requiredCapacity) {
+		int oldCapacity = buffer.length;
 		int newCapacity = oldCapacity << 1;
-		if (newCapacity - minCapacity < 0) {
-			newCapacity = minCapacity;
+		if (newCapacity - requiredCapacity < 0) {
+			newCapacity = requiredCapacity;
 		}
 		if (newCapacity < 0) {
-			if (minCapacity < 0) { // overflow
+			if (requiredCapacity < 0) {
 				throw new OutOfMemoryError();
 			}
 			newCapacity = Integer.MAX_VALUE;
 		}
-		buf = Arrays.copyOf(buf, newCapacity);
+		buffer = Arrays.copyOf(buffer, newCapacity);
 	}
 
-	/**
-	 * Writes the specified byte to this byte array output stream.
-	 *
-	 * @param b the byte to be written.
-	 */
+	@Override
 	public void write(int b) {
 		ensureCapacity(count + 1);
-		buf[count] = (byte) b;
-		count += 1;
+		buffer[count] = (byte) b;
+		++count;
 	}
 
-	/**
-	 * Writes <code>len</code> bytes from the specified byte array
-	 * starting at offset <code>off</code> to this byte array output stream.
-	 *
-	 * @param b   the data.
-	 * @param off the start offset in the data.
-	 * @param len the number of bytes to write.
-	 */
+	@Override
 	public void write(byte[] b, int off, int len) {
-		if ((off < 0) || (off > b.length) || (len < 0) ||
+		if ((off < 0) || (len < 0) || (off > b.length) ||
 				((off + len) - b.length > 0)) {
 			throw new IndexOutOfBoundsException();
 		}
+
 		ensureCapacity(count + len);
-		System.arraycopy(b, off, buf, count, len);
-		count += len;
-	}
 
-	/**
-	 * Writes the complete contents of this byte array output stream to
-	 * the specified output stream argument, as if by calling the output
-	 * stream's write method using <code>out.write(buf, 0, count)</code>.
-	 *
-	 * @param out the output stream to which to write the data.
-	 * @throws IOException if an I/O error occurs.
-	 */
-	public void writeTo(OutputStream out) throws IOException {
-		out.write(buf, 0, count);
+		System.arraycopy(b, off, buffer, count, len);
+		count += len;
 	}
 
-	/**
-	 * Resets the <code>count</code> field of this byte array output
-	 * stream to zero, so that all currently accumulated output in the
-	 * output stream is discarded. The output stream can be used again,
-	 * reusing the already allocated buffer space.
-	 *
-	 * @see java.io.ByteArrayInputStream#count
-	 */
 	public void reset() {
 		count = 0;
 	}
 
-	/**
-	 * Creates a newly allocated byte array. Its size is the current
-	 * size of this output stream and the valid contents of the buffer
-	 * have been copied into it.
-	 *
-	 * @return the current contents of this output stream, as a byte array.
-	 * @see java.io.ByteArrayOutputStream#size()
-	 */
 	public byte toByteArray()[] {
-		return Arrays.copyOf(buf, count);
+		return Arrays.copyOf(buffer, count);
 	}
 
-	/**
-	 * Returns the current size of the buffer.
-	 *
-	 * @return the value of the <code>count</code> field, which is the number
-	 * of valid bytes in this output stream.
-	 * @see java.io.ByteArrayOutputStream#count
-	 */
 	public int size() {
 		return count;
 	}
 
-	/**
-	 * Converts the buffer's contents into a string decoding bytes using the
-	 * platform's default character set. The length of the new <tt>String</tt>
-	 * is a function of the character set, and hence may not be equal to the
-	 * size of the buffer.
-	 * <p>
-	 * <p> This method always replaces malformed-input and unmappable-character
-	 * sequences with the default replacement string for the platform's
-	 * default character set. The {@linkplain java.nio.charset.CharsetDecoder}
-	 * class should be used when more control over the decoding process is
-	 * required.
-	 *
-	 * @return String decoded from the buffer's contents.
-	 * @since JDK1.1
-	 */
 	public String toString() {
-		return new String(buf, 0, count);
+		return new String(buffer, 0, count);
 	}
 
-	/**
-	 * Converts the buffer's contents into a string by decoding the bytes using
-	 * the named {@link java.nio.charset.Charset charset}. The length of the new
-	 * <tt>String</tt> is a function of the charset, and hence may not be equal
-	 * to the length of the byte array.
-	 * <p>
-	 * <p> This method always replaces malformed-input and unmappable-character
-	 * sequences with this charset's default replacement string. The {@link
-	 * java.nio.charset.CharsetDecoder} class should be used when more control
-	 * over the decoding process is required.
-	 *
-	 * @param charsetName the name of a supported
-	 *                    {@link java.nio.charset.Charset charset}
-	 * @return String decoded from the buffer's contents.
-	 * @throws UnsupportedEncodingException If the named charset is not supported
-	 * @since JDK1.1
-	 */
-	public String toString(String charsetName)
-			throws UnsupportedEncodingException {
-		return new String(buf, 0, count, charsetName);
+	private int getEndPosition() {
+		return buffer.length;
 	}
 
-	/**
-	 * Creates a newly allocated string. Its size is the current size of
-	 * the output stream and the valid contents of the buffer have been
-	 * copied into it. Each character <i>c</i> in the resulting string is
-	 * constructed from the corresponding element <i>b</i> in the byte
-	 * array such that:
-	 * <blockquote><pre>
-	 *     c == (char)(((hibyte &amp; 0xff) &lt;&lt; 8) | (b &amp; 0xff))
-	 * </pre></blockquote>
-	 *
-	 * @param hibyte the high byte of each resulting Unicode character.
-	 * @return the current contents of the output stream, as a string.
-	 * @see java.io.ByteArrayOutputStream#size()
-	 * @see java.io.ByteArrayOutputStream#toString(String)
-	 * @see java.io.ByteArrayOutputStream#toString()
-	 * @deprecated This method does not properly convert bytes into characters.
-	 * As of JDK&nbsp;1.1, the preferred way to do this is via the
-	 * <code>toString(String enc)</code> method, which takes an encoding-name
-	 * argument, or the <code>toString()</code> method, which uses the
-	 * platform's default character encoding.
-	 */
-	@Deprecated
-	public String toString(int hibyte) {
-		return new String(buf, hibyte, 0, count);
-	}
-
-	/**
-	 * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in
-	 * this class can be called after the stream has been closed without
-	 * generating an <tt>IOException</tt>.
-	 */
-	public void close() throws IOException {
-	}
-
-	/**
-	 * Returns the read/write offset position for the stream.
-	 * @return the current position in the stream.
-	 */
 	public int getPosition() {
 		return count;
 	}
 
-	/**
-	 * Sets the read/write offset position for the stream.
-	 *
-	 * @param position the position to which the offset in the stream shall be set. Must be < getEndPosition
-	 */
 	public void setPosition(int position) {
 		Preconditions.checkArgument(position < getEndPosition(), "Position out of bounds.");
 		count = position;
 	}
 
-	/**
-	 * Returns the size of the internal buffer, which is the current end position for all setPosition calls.
-	 * @return size of the internal buffer
-	 */
-	public int getEndPosition() {
-		return buf.length;
+	@Override
+	public void close() throws IOException {
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
new file mode 100644
index 0000000..b6c354e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.util;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * Utility class to deserialize legacy classes for migration.
+ */
+public final class MigrationInstantiationUtil {
+
+	public static class ClassLoaderObjectInputStream extends InstantiationUtil.ClassLoaderObjectInputStream {
+
+		public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
+			super(in, classLoader);
+		}
+
+		@Override
+		protected ObjectStreamClass readClassDescriptor()
+				throws IOException, ClassNotFoundException {
+			ObjectStreamClass objectStreamClass = super.readClassDescriptor();
+			String className = objectStreamClass.getName();
+			if (className.contains("apache.flink.")) {
+				className = className.replace("apache.flink.", "apache.flink.migration.");
+				try {
+					Class<?> clazz = Class.forName(className, false, classLoader);
+					objectStreamClass = ObjectStreamClass.lookup(clazz);
+				} catch (Exception ignored) {
+
+				}
+			}
+			return objectStreamClass;
+		}
+	}
+	
+	public static <T> T deserializeObject(byte[] bytes, ClassLoader cl) throws IOException, ClassNotFoundException {
+		return deserializeObject(new ByteArrayInputStream(bytes), cl);
+	}
+
+	@SuppressWarnings("unchecked")
+	public static <T> T deserializeObject(InputStream in, ClassLoader cl) throws IOException, ClassNotFoundException {
+		final ClassLoader old = Thread.currentThread().getContextClassLoader();
+		try (ObjectInputStream oois = new ClassLoaderObjectInputStream(in, cl)) {
+			Thread.currentThread().setContextClassLoader(cl);
+			return (T) oois.readObject();
+		} finally {
+			Thread.currentThread().setContextClassLoader(old);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Private constructor to prevent instantiation.
+	 */
+	private MigrationInstantiationUtil() {
+		throw new IllegalAccessError();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/src/main/java/org/apache/flink/migration/util/SerializedValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/migration/util/SerializedValue.java b/flink-core/src/main/java/org/apache/flink/migration/util/SerializedValue.java
new file mode 100644
index 0000000..aab68c9
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/migration/util/SerializedValue.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.util;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * This class is used to transfer (via serialization) objects whose classes are not available
+ * in the system class loader. When those objects are deserialized without access to their
+ * special class loader, the deserialization fails with a {@code ClassNotFoundException}.
+ *
+ * To work around that issue, the SerializedValue serialized data immediately into a byte array.
+ * When send through RPC or another service that uses serialization, only the byte array is
+ * transferred. The object is deserialized later (upon access) and requires the accessor to
+ * provide the corresponding class loader.
+ *
+ * @param <T> The type of the value held.
+ */
+@Deprecated
+public class SerializedValue<T> implements java.io.Serializable {
+
+	private static final long serialVersionUID = -3564011643393683761L;
+
+	/** The serialized data */
+	private final byte[] serializedData;
+
+	private SerializedValue(byte[] serializedData) {
+		this.serializedData = serializedData;
+	}
+
+	public SerializedValue(T value) throws IOException {
+		this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
+	}
+
+	@SuppressWarnings("unchecked")
+	public T deserializeValue(ClassLoader loader) throws IOException, ClassNotFoundException {
+		return serializedData == null ? null : (T) MigrationInstantiationUtil.deserializeObject(serializedData, loader);
+	}
+
+	/**
+	 * Returns the serialized value or <code>null</code> if no value is set.
+	 *
+	 * @return Serialized data.
+	 */
+	public byte[] getByteArray() {
+		return serializedData;
+	}
+
+	public static <T> SerializedValue<T> fromBytes(byte[] serializedData) {
+		return new SerializedValue<T>(serializedData);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return serializedData == null ? 0 : Arrays.hashCode(serializedData);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof SerializedValue) {
+			SerializedValue<?> other = (SerializedValue<?>) obj;
+			return this.serializedData == null ? other.serializedData == null :
+					(other.serializedData != null && Arrays.equals(this.serializedData, other.serializedData));
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "SerializedValue";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index cd5c91a..ffb5a7d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -51,7 +51,7 @@ public final class InstantiationUtil {
 	 */
 	public static class ClassLoaderObjectInputStream extends ObjectInputStream {
 
-		private final ClassLoader classLoader;
+		protected final ClassLoader classLoader;
 
 		public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
 			super(in);

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
new file mode 100644
index 0000000..76d4eef
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration;
+
+import org.apache.flink.migration.state.MigrationKeyGroupStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+
+import java.util.Collection;
+
+public class MigrationUtil {
+
+	public static boolean isOldSavepointKeyedState(Collection<KeyGroupsStateHandle> keyGroupsStateHandles) {
+		return (keyGroupsStateHandles != null)
+				&& (keyGroupsStateHandles.size() == 1)
+				&& (keyGroupsStateHandles.iterator().next() instanceof MigrationKeyGroupStateHandle);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java
new file mode 100644
index 0000000..ad94993
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.checkpoint;
+
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Simple container class which contains the serialized state handle for a key group.
+ *
+ * The key group state handle is kept in serialized form because it can contain user code classes
+ * which might not be available on the JobManager.
+ */
+@Deprecated
+public class KeyGroupState implements Serializable {
+	private static final long serialVersionUID = -5926696455438467634L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(KeyGroupState.class);
+
+	private final SerializedValue<StateHandle<?>> keyGroupState;
+
+	private final long stateSize;
+
+	private final long duration;
+
+	public KeyGroupState(SerializedValue<StateHandle<?>> keyGroupState, long stateSize, long duration) {
+		this.keyGroupState = keyGroupState;
+
+		this.stateSize = stateSize;
+
+		this.duration = duration;
+	}
+
+	public SerializedValue<StateHandle<?>> getKeyGroupState() {
+		return keyGroupState;
+	}
+
+	public long getDuration() {
+		return duration;
+	}
+
+	public long getStateSize() {
+		return stateSize;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof KeyGroupState) {
+			KeyGroupState other = (KeyGroupState) obj;
+
+			return keyGroupState.equals(other.keyGroupState) && stateSize == other.stateSize &&
+				duration == other.duration;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return (int) (this.stateSize ^ this.stateSize >>> 32) +
+			31 * ((int) (this.duration ^ this.duration >>> 32) +
+				31 * keyGroupState.hashCode());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java
new file mode 100644
index 0000000..df886b3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.checkpoint;
+
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.util.SerializedValue;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@Deprecated
+public class SubtaskState implements Serializable {
+
+	private static final long serialVersionUID = -2394696997971923995L;
+
+	/** The state of the parallel operator */
+	private final SerializedValue<StateHandle<?>> state;
+
+	/**
+	 * The state size. This is also part of the deserialized state handle.
+	 * We store it here in order to not deserialize the state handle when
+	 * gathering stats.
+	 */
+	private final long stateSize;
+
+	/** The duration of the acknowledged (ack timestamp - trigger timestamp). */
+	private final long duration;
+	
+	public SubtaskState(
+			SerializedValue<StateHandle<?>> state,
+			long stateSize,
+			long duration) {
+
+		this.state = checkNotNull(state, "State");
+		// Sanity check and don't fail checkpoint because of this.
+		this.stateSize = stateSize >= 0 ? stateSize : 0;
+
+		this.duration = duration;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public SerializedValue<StateHandle<?>> getState() {
+		return state;
+	}
+
+	public long getStateSize() {
+		return stateSize;
+	}
+
+	public long getDuration() {
+		return duration;
+	}
+
+	public void discard(ClassLoader userClassLoader) throws Exception {
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		else if (o instanceof SubtaskState) {
+			SubtaskState that = (SubtaskState) o;
+			return this.state.equals(that.state) && stateSize == that.stateSize &&
+				duration == that.duration;
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return (int) (this.stateSize ^ this.stateSize >>> 32) +
+			31 * ((int) (this.duration ^ this.duration >>> 32) +
+				31 * state.hashCode());
+	}
+
+	@Override
+	public String toString() {
+		return String.format("SubtaskState(Size: %d, Duration: %d, State: %s)", stateSize, duration, state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java
new file mode 100644
index 0000000..798c112
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.checkpoint;
+
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.util.SerializedValue;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+@Deprecated
+public class TaskState implements Serializable {
+
+	private static final long serialVersionUID = -4845578005863201810L;
+
+	private final JobVertexID jobVertexID;
+
+	/** Map of task states which can be accessed by their sub task index */
+	private final Map<Integer, SubtaskState> subtaskStates;
+
+	/** Map of key-value states which can be accessed by their key group index */
+	private final Map<Integer, KeyGroupState> kvStates;
+
+	/** Parallelism of the operator when it was checkpointed */
+	private final int parallelism;
+
+	public TaskState(JobVertexID jobVertexID, int parallelism) {
+		this.jobVertexID = jobVertexID;
+
+		this.subtaskStates = new HashMap<>(parallelism);
+
+		this.kvStates = new HashMap<>();
+
+		this.parallelism = parallelism;
+	}
+
+	public JobVertexID getJobVertexID() {
+		return jobVertexID;
+	}
+
+	public void putState(int subtaskIndex, SubtaskState subtaskState) {
+		if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+			throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
+				" exceeds the maximum number of sub tasks " + subtaskStates.size());
+		} else {
+			subtaskStates.put(subtaskIndex, subtaskState);
+		}
+	}
+
+	public SubtaskState getState(int subtaskIndex) {
+		if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+			throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
+				" exceeds the maximum number of sub tasks " + subtaskStates.size());
+		} else {
+			return subtaskStates.get(subtaskIndex);
+		}
+	}
+
+	public Collection<SubtaskState> getStates() {
+		return subtaskStates.values();
+	}
+
+	public Map<Integer, SubtaskState> getSubtaskStatesById() {
+		return subtaskStates;
+	}
+
+	public long getStateSize() {
+		long result = 0L;
+
+		for (SubtaskState subtaskState : subtaskStates.values()) {
+			result += subtaskState.getStateSize();
+		}
+
+		for (KeyGroupState keyGroupState : kvStates.values()) {
+			result += keyGroupState.getStateSize();
+		}
+
+		return result;
+	}
+
+	public int getNumberCollectedStates() {
+		return subtaskStates.size();
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public void putKvState(int keyGroupId, KeyGroupState keyGroupState) {
+		kvStates.put(keyGroupId, keyGroupState);
+	}
+
+	public KeyGroupState getKvState(int keyGroupId) {
+		return kvStates.get(keyGroupId);
+	}
+
+	/**
+	 * Retrieve the set of key-value state key groups specified by the given key group partition set.
+	 * The key groups are returned as a map where the key group index maps to the serialized state
+	 * handle of the key group.
+	 *
+	 * @param keyGroupPartition Set of key group indices
+	 * @return Map of serialized key group state handles indexed by their key group index.
+	 */
+	public Map<Integer, SerializedValue<StateHandle<?>>> getUnwrappedKvStates(Set<Integer> keyGroupPartition) {
+		HashMap<Integer, SerializedValue<StateHandle<?>>> result = new HashMap<>(keyGroupPartition.size());
+
+		for (Integer keyGroupId : keyGroupPartition) {
+			KeyGroupState keyGroupState = kvStates.get(keyGroupId);
+
+			if (keyGroupState != null) {
+				result.put(keyGroupId, kvStates.get(keyGroupId).getKeyGroupState());
+			}
+		}
+
+		return result;
+	}
+
+	public int getNumberCollectedKvStates() {
+		return kvStates.size();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof TaskState) {
+			TaskState other = (TaskState) obj;
+
+			return jobVertexID.equals(other.jobVertexID) && parallelism == other.parallelism &&
+				subtaskStates.equals(other.subtaskStates) && kvStates.equals(other.kvStates);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, kvStates);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
new file mode 100644
index 0000000..8aa562e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.checkpoint.savepoint;
+
+import org.apache.flink.migration.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+/**
+ * Savepoint version 0.
+ *
+ * <p>This format was introduced with Flink 1.1.0.
+ */
+public class SavepointV0 implements Savepoint {
+
+	/** The savepoint version. */
+	public static final int VERSION = 0;
+
+	/** The checkpoint ID */
+	private final long checkpointId;
+
+	/** The task states */
+	private final Collection<TaskState> taskStates;
+
+	public SavepointV0(long checkpointId, Collection<TaskState> taskStates) {
+		this.checkpointId = checkpointId;
+		this.taskStates = Preconditions.checkNotNull(taskStates, "Task States");
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	@Override
+	public Collection<org.apache.flink.runtime.checkpoint.TaskState> getTaskStates() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void dispose() throws Exception {
+		//NOP
+	}
+
+
+	public Collection<TaskState> getOldTaskStates() {
+		return taskStates;
+	}
+
+	@Override
+	public String toString() {
+		return "Savepoint(version=" + VERSION + ")";
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		SavepointV0 that = (SavepointV0) o;
+		return checkpointId == that.checkpointId && getTaskStates().equals(that.getTaskStates());
+	}
+
+	@Override
+	public int hashCode() {
+		int result = (int) (checkpointId ^ (checkpointId >>> 32));
+		result = 31 * result + taskStates.hashCode();
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
new file mode 100644
index 0000000..e4125e5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.checkpoint.savepoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.checkpoint.KeyGroupState;
+import org.apache.flink.migration.runtime.checkpoint.SubtaskState;
+import org.apache.flink.migration.runtime.checkpoint.TaskState;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.runtime.state.filesystem.AbstractFileStateHandle;
+import org.apache.flink.migration.runtime.state.memory.SerializedStateHandle;
+import org.apache.flink.migration.state.MigrationKeyGroupStateHandle;
+import org.apache.flink.migration.state.MigrationStreamStateHandle;
+import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
+import org.apache.flink.migration.util.SerializedValue;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.MultiStreamStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * <p>
+ * <p>In contrast to previous savepoint versions, this serializer makes sure
+ * that no default Java serialization is used for serialization. Therefore, we
+ * don't rely on any involved Java classes to stay the same.
+ */
+public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
+
+	public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer();
+	private static final StreamStateHandle SIGNAL_0 = new ByteStreamStateHandle("SIGNAL_0", new byte[]{0});
+	private static final StreamStateHandle SIGNAL_1 = new ByteStreamStateHandle("SIGNAL_1", new byte[]{1});
+
+	private static final int MAX_SIZE = 4 * 1024 * 1024;
+
+	private SavepointV0Serializer() {
+	}
+
+
+	@Override
+	public void serialize(SavepointV1 savepoint, DataOutputStream dos) throws IOException {
+		throw new UnsupportedOperationException("This serializer is read-only and only exists for backwards compatibility");
+	}
+
+	@Override
+	public SavepointV1 deserialize(DataInputStream dis, ClassLoader userClassLoader) throws IOException {
+
+		long checkpointId = dis.readLong();
+
+		// Task states
+		int numTaskStates = dis.readInt();
+		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+		for (int i = 0; i < numTaskStates; i++) {
+			JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
+			int parallelism = dis.readInt();
+
+			// Add task state
+			TaskState taskState = new TaskState(jobVertexId, parallelism);
+			taskStates.add(taskState);
+
+			// Sub task states
+			int numSubTaskStates = dis.readInt();
+			for (int j = 0; j < numSubTaskStates; j++) {
+				int subtaskIndex = dis.readInt();
+
+				int length = dis.readInt();
+
+				SerializedValue<StateHandle<?>> serializedValue;
+				if (length == -1) {
+					serializedValue = new SerializedValue<>(null);
+				} else {
+					byte[] serializedData = new byte[length];
+					dis.readFully(serializedData, 0, length);
+					serializedValue = SerializedValue.fromBytes(serializedData);
+				}
+
+				long stateSize = dis.readLong();
+				long duration = dis.readLong();
+
+				SubtaskState subtaskState = new SubtaskState(
+						serializedValue,
+						stateSize,
+						duration);
+
+				taskState.putState(subtaskIndex, subtaskState);
+			}
+
+			// Key group states
+			int numKvStates = dis.readInt();
+			for (int j = 0; j < numKvStates; j++) {
+				int keyGroupIndex = dis.readInt();
+
+				int length = dis.readInt();
+
+				SerializedValue<StateHandle<?>> serializedValue;
+				if (length == -1) {
+					serializedValue = new SerializedValue<>(null);
+				} else {
+					byte[] serializedData = new byte[length];
+					dis.readFully(serializedData, 0, length);
+					serializedValue = SerializedValue.fromBytes(serializedData);
+				}
+
+				long stateSize = dis.readLong();
+				long duration = dis.readLong();
+
+				KeyGroupState keyGroupState = new KeyGroupState(
+						serializedValue,
+						stateSize,
+						duration);
+
+				taskState.putKvState(keyGroupIndex, keyGroupState);
+			}
+		}
+
+		try {
+			return convertSavepoint(taskStates, userClassLoader, checkpointId);
+		} catch (Exception e) {
+			throw new IOException(e);
+		}
+	}
+
+	private SavepointV1 convertSavepoint(
+			List<TaskState> taskStates,
+			ClassLoader userClassLoader,
+			long checkpointID) throws Exception {
+
+		List<org.apache.flink.runtime.checkpoint.TaskState> newTaskStates = new ArrayList<>(taskStates.size());
+
+		for (TaskState taskState : taskStates) {
+			newTaskStates.add(convertTaskState(taskState, userClassLoader, checkpointID));
+		}
+
+		return new SavepointV1(checkpointID, newTaskStates);
+	}
+
+	private org.apache.flink.runtime.checkpoint.TaskState convertTaskState(
+			TaskState taskState,
+			ClassLoader userClassLoader,
+			long checkpointID) throws Exception {
+
+		JobVertexID jobVertexID = taskState.getJobVertexID();
+		int parallelism = taskState.getParallelism();
+		int chainLength = determineOperatorChainLength(taskState, userClassLoader);
+
+		org.apache.flink.runtime.checkpoint.TaskState newTaskState =
+				new org.apache.flink.runtime.checkpoint.TaskState(
+						jobVertexID,
+						parallelism,
+						parallelism,
+						chainLength);
+
+		if (chainLength > 0) {
+
+			Map<Integer, SubtaskState> subtaskStates = taskState.getSubtaskStatesById();
+
+			for (Map.Entry<Integer, SubtaskState> subtaskState : subtaskStates.entrySet()) {
+				int parallelInstanceIdx = subtaskState.getKey();
+				newTaskState.putState(parallelInstanceIdx, convertSubtaskState(
+						subtaskState.getValue(),
+						parallelInstanceIdx,
+						userClassLoader,
+						checkpointID));
+			}
+		}
+
+		return newTaskState;
+	}
+
+	private org.apache.flink.runtime.checkpoint.SubtaskState convertSubtaskState(
+			SubtaskState subtaskState,
+			int parallelInstanceIdx,
+			ClassLoader userClassLoader,
+			long checkpointID) throws Exception {
+
+		SerializedValue<StateHandle<?>> serializedValue = subtaskState.getState();
+
+		StreamTaskStateList stateList = (StreamTaskStateList) serializedValue.deserializeValue(userClassLoader);
+		StreamTaskState[] streamTaskStates = stateList.getState(userClassLoader);
+
+		List<StreamStateHandle> newChainStateList = Arrays.asList(new StreamStateHandle[streamTaskStates.length]);
+		KeyGroupsStateHandle newKeyedState = null;
+
+		for (int chainIdx = 0; chainIdx < streamTaskStates.length; ++chainIdx) {
+
+			StreamTaskState streamTaskState = streamTaskStates[chainIdx];
+			if (streamTaskState == null) {
+				continue;
+			}
+
+			newChainStateList.set(chainIdx, convertOperatorAndFunctionState(streamTaskState));
+			HashMap<String, KvStateSnapshot<?, ?, ?, ?>> oldKeyedState = streamTaskState.getKvStates();
+
+			if (null != oldKeyedState) {
+				Preconditions.checkState(null == newKeyedState, "Found more than one keyed state in chain");
+				newKeyedState = convertKeyedBackendState(oldKeyedState, parallelInstanceIdx, checkpointID);
+			}
+		}
+
+		ChainedStateHandle<StreamStateHandle> newChainedState = new ChainedStateHandle<>(newChainStateList);
+		ChainedStateHandle<OperatorStateHandle> nopChain =
+				new ChainedStateHandle<>(Arrays.asList(new OperatorStateHandle[newChainedState.getLength()]));
+
+		return new org.apache.flink.runtime.checkpoint.SubtaskState(
+				newChainedState,
+				nopChain,
+				nopChain,
+				newKeyedState,
+				null);
+	}
+
+	private StreamStateHandle convertOperatorAndFunctionState(StreamTaskState streamTaskState) throws Exception {
+
+		List<StreamStateHandle> mergeStateHandles = new ArrayList<>(4);
+
+		StateHandle<Serializable> functionState = streamTaskState.getFunctionState();
+		StateHandle<?> operatorState = streamTaskState.getOperatorState();
+
+		if (null != functionState) {
+			mergeStateHandles.add(SIGNAL_1);
+			mergeStateHandles.add(convertStateHandle(functionState));
+		} else {
+			mergeStateHandles.add(SIGNAL_0);
+		}
+
+		if (null != operatorState) {
+			mergeStateHandles.add(SIGNAL_1);
+			mergeStateHandles.add(convertStateHandle(operatorState));
+		} else {
+			mergeStateHandles.add(SIGNAL_0);
+		}
+
+		return new MigrationStreamStateHandle(new MultiStreamStateHandle(mergeStateHandles));
+	}
+
+	private KeyGroupsStateHandle convertKeyedBackendState(
+			HashMap<String, KvStateSnapshot<?, ?, ?, ?>> oldKeyedState,
+			int parallelInstanceIdx,
+			long checkpointID) throws Exception {
+
+		if (null != oldKeyedState) {
+
+			CheckpointStreamFactory checkpointStreamFactory = new MemCheckpointStreamFactory(MAX_SIZE);
+
+			CheckpointStreamFactory.CheckpointStateOutputStream keyedStateOut =
+					checkpointStreamFactory.createCheckpointStateOutputStream(checkpointID, 0L);
+
+			final long offset = keyedStateOut.getPos();
+
+			InstantiationUtil.serializeObject(keyedStateOut, oldKeyedState);
+			StreamStateHandle streamStateHandle = keyedStateOut.closeAndGetHandle();
+
+			if (null != streamStateHandle) {
+				KeyGroupRangeOffsets keyGroupRangeOffsets =
+						new KeyGroupRangeOffsets(parallelInstanceIdx, parallelInstanceIdx, new long[]{offset});
+
+				return new MigrationKeyGroupStateHandle(keyGroupRangeOffsets, streamStateHandle);
+			}
+		}
+		return null;
+	}
+
+	private int determineOperatorChainLength(
+			TaskState taskState,
+			ClassLoader userClassLoader) throws IOException, ClassNotFoundException {
+
+		Collection<SubtaskState> subtaskStates = taskState.getStates();
+
+		if (subtaskStates == null || subtaskStates.isEmpty()) {
+			return 0;
+		}
+
+		SubtaskState firstSubtaskState = subtaskStates.iterator().next();
+		Object toCastTaskStateList = firstSubtaskState.getState().deserializeValue(userClassLoader);
+
+		if (toCastTaskStateList instanceof StreamTaskStateList) {
+			StreamTaskStateList taskStateList = (StreamTaskStateList) toCastTaskStateList;
+			StreamTaskState[] streamTaskStates = taskStateList.getState(userClassLoader);
+
+			return streamTaskStates.length;
+		}
+		return 0;
+	}
+
+	private static StreamStateHandle convertStateHandle(StateHandle<?> oldStateHandle) throws Exception {
+		if (oldStateHandle instanceof AbstractFileStateHandle) {
+			Path path = ((AbstractFileStateHandle) oldStateHandle).getFilePath();
+			return new FileStateHandle(path, oldStateHandle.getStateSize());
+		} else if (oldStateHandle instanceof SerializedStateHandle) {
+			byte[] data = ((SerializedStateHandle<?>) oldStateHandle).getSerializedData();
+			return new ByteStreamStateHandle(String.valueOf(System.identityHashCode(data)), data);
+		} else if (oldStateHandle instanceof org.apache.flink.migration.runtime.state.memory.ByteStreamStateHandle) {
+			byte[] data =
+					((org.apache.flink.migration.runtime.state.memory.ByteStreamStateHandle) oldStateHandle).getData();
+			return new ByteStreamStateHandle(String.valueOf(System.identityHashCode(data)), data);
+		}
+		throw new IllegalArgumentException("Unknown state handle type: " + oldStateHandle);
+	}
+
+	@VisibleForTesting
+	public void serializeOld(SavepointV0 savepoint, DataOutputStream dos) throws IOException {
+		dos.writeLong(savepoint.getCheckpointId());
+
+		Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> taskStates = savepoint.getOldTaskStates();
+		dos.writeInt(taskStates.size());
+
+		for (org.apache.flink.migration.runtime.checkpoint.TaskState taskState : savepoint.getOldTaskStates()) {
+			// Vertex ID
+			dos.writeLong(taskState.getJobVertexID().getLowerPart());
+			dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+			// Parallelism
+			int parallelism = taskState.getParallelism();
+			dos.writeInt(parallelism);
+
+			// Sub task states
+			dos.writeInt(taskState.getNumberCollectedStates());
+
+			for (int i = 0; i < parallelism; i++) {
+				SubtaskState subtaskState = taskState.getState(i);
+
+				if (subtaskState != null) {
+					dos.writeInt(i);
+
+					SerializedValue<?> serializedValue = subtaskState.getState();
+					if (serializedValue == null) {
+						dos.writeInt(-1); // null
+					} else {
+						byte[] serialized = serializedValue.getByteArray();
+						dos.writeInt(serialized.length);
+						dos.write(serialized, 0, serialized.length);
+					}
+
+					dos.writeLong(subtaskState.getStateSize());
+					dos.writeLong(subtaskState.getDuration());
+				}
+			}
+
+			// Key group states
+			dos.writeInt(taskState.getNumberCollectedKvStates());
+
+			for (int i = 0; i < parallelism; i++) {
+				KeyGroupState keyGroupState = taskState.getKvState(i);
+
+				if (keyGroupState != null) {
+					dos.write(i);
+
+					SerializedValue<?> serializedValue = keyGroupState.getKeyGroupState();
+					if (serializedValue == null) {
+						dos.writeInt(-1); // null
+					} else {
+						byte[] serialized = serializedValue.getByteArray();
+						dos.writeInt(serialized.length);
+						dos.write(serialized, 0, serialized.length);
+					}
+
+					dos.writeLong(keyGroupState.getStateSize());
+					dos.writeLong(keyGroupState.getDuration());
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractCloseableHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractCloseableHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractCloseableHandle.java
new file mode 100644
index 0000000..873dab8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractCloseableHandle.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * A simple base for closable handles.
+ * 
+ * Offers to register a stream (or other closable object) that close calls are delegated to if
+ * the handle is closed or was already closed.
+ */
+@Deprecated
+public abstract class AbstractCloseableHandle implements Closeable, Serializable {
+
+	/** Serial Version UID must be constant to maintain format compatibility */
+	private static final long serialVersionUID = 1L;
+
+	/** To atomically update the "closable" field without needing to add a member class like "AtomicBoolean */
+	private static final AtomicIntegerFieldUpdater<AbstractCloseableHandle> CLOSER = 
+			AtomicIntegerFieldUpdater.newUpdater(AbstractCloseableHandle.class, "isClosed");
+
+	// ------------------------------------------------------------------------
+
+	/** The closeable to close if this handle is closed late */ 
+	private transient volatile Closeable toClose;
+
+	/** Flag to remember if this handle was already closed */
+	@SuppressWarnings("unused") // this field is actually updated, but via the "CLOSER" updater
+	private transient volatile int isClosed;
+
+	// ------------------------------------------------------------------------
+
+	protected final void registerCloseable(Closeable toClose) throws IOException {
+		if (toClose == null) {
+			return;
+		}
+		
+		// NOTE: The order of operations matters here:
+		// (1) first setting the closeable
+		// (2) checking the flag.
+		// Because the order in the {@link #close()} method is the opposite, and
+		// both variables are volatile (reordering barriers), we can be sure that
+		// one of the methods always notices the effect of a concurrent call to the
+		// other method.
+
+		this.toClose = toClose;
+
+		// check if we were closed early
+		if (this.isClosed != 0) {
+			toClose.close();
+			throw new IOException("handle is closed");
+		}
+	}
+
+	/**
+	 * Closes the handle.
+	 * 
+	 * <p>If a "Closeable" has been registered via {@link #registerCloseable(Closeable)},
+	 * then this will be closes.
+	 * 
+	 * <p>If any "Closeable" will be registered via {@link #registerCloseable(Closeable)} in the future,
+	 * it will immediately be closed and that method will throw an exception.
+	 * 
+	 * @throws IOException Exceptions occurring while closing an already registered {@code Closeable}
+	 *                     are forwarded.
+	 * 
+	 * @see #registerCloseable(Closeable)
+	 */
+	@Override
+	public final void close() throws IOException {
+		// NOTE: The order of operations matters here:
+		// (1) first setting the closed flag
+		// (2) checking whether there is already a closeable
+		// Because the order in the {@link #registerCloseable(Closeable)} method is the opposite, and
+		// both variables are volatile (reordering barriers), we can be sure that
+		// one of the methods always notices the effect of a concurrent call to the
+		// other method.
+
+		if (CLOSER.compareAndSet(this, 0, 1)) {
+			final Closeable toClose = this.toClose;
+			if (toClose != null) {
+				this.toClose = null;
+				toClose.close();
+			}
+		}
+	}
+
+	/**
+	 * Checks whether this handle has been closed.
+	 * 
+	 * @return True is the handle is closed, false otherwise.
+	 */
+	public boolean isClosed() {
+		return isClosed != 0;
+	}
+
+	/**
+	 * This method checks whether the handle is closed and throws an exception if it is closed.
+	 * If the handle is not closed, this method does nothing.
+	 * 
+	 * @throws IOException Thrown, if the handle has been closed.
+	 */
+	public void ensureNotClosed() throws IOException {
+		if (isClosed != 0) {
+			throw new IOException("handle is closed");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
new file mode 100644
index 0000000..b7932f5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A state backend defines how state is stored and snapshotted during checkpoints.
+ */
+@Deprecated
+public abstract class AbstractStateBackend implements Serializable {
+	
+	private static final long serialVersionUID = 4620413814639220247L;
+
+	/**
+	 * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
+	 */
+	private static final class DataInputViewHandle implements StateHandle<DataInputView> {
+
+		private static final long serialVersionUID = 2891559813513532079L;
+
+		private final StreamStateHandle stream;
+
+		private DataInputViewHandle(StreamStateHandle stream) {
+			this.stream = stream;
+		}
+
+		@Override
+		public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
+			return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public long getStateSize() throws Exception {
+			return stream.getStateSize();
+		}
+
+		@Override
+		public void close() throws IOException {
+			throw new UnsupportedOperationException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/KvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/KvStateSnapshot.java
new file mode 100644
index 0000000..9936ca7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/KvStateSnapshot.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+
+@Deprecated
+public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>>
+		extends StateObject {
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateHandle.java
new file mode 100644
index 0000000..97d6984
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateHandle.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state;
+
+/**
+ * StateHandle is a general handle interface meant to abstract operator state fetching. 
+ * A StateHandle implementation can for example include the state itself in cases where the state 
+ * is lightweight or fetching it lazily from some external storage when the state is too large.
+ */
+@Deprecated
+public interface StateHandle<T> extends StateObject {
+
+	/**
+	 * This retrieves and return the state represented by the handle.
+	 *
+	 * @param userCodeClassLoader Class loader for deserializing user code specific classes
+	 *
+	 * @return The state represented by the handle.
+	 * @throws Exception Thrown, if the state cannot be fetched.
+	 */
+	T getState(ClassLoader userCodeClassLoader) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateObject.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateObject.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateObject.java
new file mode 100644
index 0000000..2f1048f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateObject.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state;
+
+/**
+ * Base of all types that represent checkpointed state. Specializations are for
+ * example {@link StateHandle StateHandles} (directly resolve to state) and 
+ * {@link KvStateSnapshot key/value state snapshots}.
+ * 
+ * <p>State objects define how to:
+ * <ul>
+ *     <li><b>Discard State</b>: The {@link #discardState()} method defines how state is permanently
+ *         disposed/deleted. After that method call, state may not be recoverable any more.</li>
+ 
+ *     <li><b>Close the current state access</b>: The {@link #close()} method defines how to
+ *         stop the current access or recovery to the state. Called for example when an operation is
+ *         canceled during recovery.</li>
+ * </ul>
+ */
+@Deprecated
+public interface StateObject extends java.io.Closeable, java.io.Serializable {
+
+	/**
+	 * Discards the state referred to by this handle, to free up resources in
+	 * the persistent storage. This method is called when the handle will not be
+	 * used any more.
+	 */
+	void discardState() throws Exception;
+
+	/**
+	 * Returns the size of the state in bytes.
+	 *
+	 * <p>If the the size is not known, return {@code 0}.
+	 *
+	 * @return Size of the state in bytes.
+	 * @throws Exception If the operation fails during size retrieval.
+	 */
+	long getStateSize() throws Exception;
+}


[2/3] flink git commit: [FLINK-5041] Savepoint backwards compatibility 1.1 -> 1.2

Posted by uc...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StreamStateHandle.java
new file mode 100644
index 0000000..55f9b58
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StreamStateHandle.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state;
+
+import java.io.InputStream;
+import java.io.Serializable;
+
+/**
+ * A state handle that produces an input stream when resolved.
+ */
+@Deprecated
+public interface StreamStateHandle extends StateHandle<InputStream> {
+
+	/**
+	 * Converts this stream state handle into a state handle that de-serializes
+	 * the stream into an object using Java's serialization mechanism.
+	 *
+	 * @return The state handle that automatically de-serializes.
+	 */
+	<T extends Serializable> StateHandle<T> toSerializableHandle();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java
new file mode 100644
index 0000000..25a0e89
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.state.AbstractCloseableHandle;
+import org.apache.flink.migration.runtime.state.StateObject;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state that is stored in a file.
+ */
+@Deprecated
+public abstract class AbstractFileStateHandle extends AbstractCloseableHandle implements StateObject {
+
+	private static final long serialVersionUID = 350284443258002355L;
+
+	/** The path to the file in the filesystem, fully describing the file system */
+	private final Path filePath;
+
+	/** Cached file system handle */
+	private transient FileSystem fs;
+
+	/**
+	 * Creates a new file state for the given file path.
+	 * 
+	 * @param filePath The path to the file that stores the state.
+	 */
+	protected AbstractFileStateHandle(Path filePath) {
+		this.filePath = checkNotNull(filePath);
+	}
+
+	/**
+	 * Gets the path where this handle's state is stored.
+	 * @return The path where this handle's state is stored.
+	 */
+	public Path getFilePath() {
+		return filePath;
+	}
+
+	/**
+	 * Discard the state by deleting the file that stores the state. If the parent directory
+	 * of the state is empty after deleting the state file, it is also deleted.
+	 * 
+	 * @throws Exception Thrown, if the file deletion (not the directory deletion) fails.
+	 */
+	@Override
+	public void discardState() throws Exception {
+		getFileSystem().delete(filePath, false);
+
+		// send a call to delete the checkpoint directory containing the file. This will
+		// fail (and be ignored) when some files still exist
+		try {
+			getFileSystem().delete(filePath.getParent(), false);
+		} catch (IOException ignored) {}
+	}
+
+	/**
+	 * Gets the file system that stores the file state.
+	 * @return The file system that stores the file state.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	protected FileSystem getFileSystem() throws IOException {
+		if (fs == null) {
+			fs = FileSystem.get(filePath.toUri());
+		}
+		return fs;
+	}
+
+	/**
+	 * Returns the file size in bytes.
+	 *
+	 * @return The file size in bytes.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	protected long getFileSize() throws IOException {
+		return getFileSystem().getFileStatus(filePath).getLen();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
new file mode 100644
index 0000000..59c373b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+
+import java.io.IOException;
+
+/**
+ * A snapshot of a heap key/value state stored in a file.
+ * 
+ * @param <K> The type of the key in the snapshot state.
+ * @param <N> The type of the namespace in the snapshot state.
+ * @param <SV> The type of the state value.
+ */
+@Deprecated
+public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> 
+		extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** Key Serializer */
+	protected final TypeSerializer<K> keySerializer;
+
+	/** Namespace Serializer */
+	protected final TypeSerializer<N> namespaceSerializer;
+
+	/** Serializer for the state value */
+	protected final TypeSerializer<SV> stateSerializer;
+
+	/** StateDescriptor, for sanity checks */
+	protected final SD stateDesc;
+
+	public AbstractFsStateSnapshot(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		TypeSerializer<SV> stateSerializer,
+		SD stateDesc,
+		Path filePath) {
+		super(filePath);
+		this.stateDesc = stateDesc;
+		this.keySerializer = keySerializer;
+		this.stateSerializer = stateSerializer;
+		this.namespaceSerializer = namespaceSerializer;
+
+	}
+
+	@Override
+	public long getStateSize() throws IOException {
+		return getFileSize();
+	}
+
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	public TypeSerializer<SV> getStateSerializer() {
+		return stateSerializer;
+	}
+
+	public SD getStateDesc() {
+		return stateDesc;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileSerializableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileSerializableStateHandle.java
new file mode 100644
index 0000000..ef908f5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileSerializableStateHandle.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.util.MigrationInstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+/**
+ * A state handle that points to state stored in a file via Java Serialization.
+ * 
+ * @param <T> The type of state pointed to by the state handle.
+ */
+@Deprecated
+public class FileSerializableStateHandle<T extends Serializable> extends AbstractFileStateHandle implements StateHandle<T> {
+
+	private static final long serialVersionUID = -657631394290213622L;
+
+	/**
+	 * Creates a new FileSerializableStateHandle pointing to state at the given file path.
+	 * 
+	 * @param filePath The path to the file containing the checkpointed state.
+	 */
+	public FileSerializableStateHandle(Path filePath) {
+		super(filePath);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public T getState(ClassLoader classLoader) throws Exception {
+		ensureNotClosed();
+
+		try (FSDataInputStream inStream = getFileSystem().open(getFilePath())) {
+			// make sure any deserialization can be aborted
+			registerCloseable(inStream);
+
+			ObjectInputStream ois = new MigrationInstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
+			return (T) ois.readObject();
+		}
+	}
+
+	/**
+	 * Returns the file size in bytes.
+	 *
+	 * @return The file size in bytes.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	@Override
+	public long getStateSize() throws IOException {
+		return getFileSize();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileStreamStateHandle.java
new file mode 100644
index 0000000..89ff4c4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileStreamStateHandle.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.runtime.state.StreamStateHandle;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+
+/**
+ * A state handle that points to state in a file system, accessible as an input stream.
+ */
+@Deprecated
+public class FileStreamStateHandle extends AbstractFileStateHandle implements StreamStateHandle {
+
+	private static final long serialVersionUID = -6826990484549987311L;
+
+	/**
+	 * Creates a new FileStreamStateHandle pointing to state at the given file path.
+	 * 
+	 * @param filePath The path to the file containing the checkpointed state.
+	 */
+	public FileStreamStateHandle(Path filePath) {
+		super(filePath);
+	}
+
+	@Override
+	public InputStream getState(ClassLoader userCodeClassLoader) throws Exception {
+		ensureNotClosed();
+
+		InputStream inStream = getFileSystem().open(getFilePath());
+		// make sure the state handle is cancelable
+		registerCloseable(inStream);
+
+		return inStream; 
+	}
+
+	/**
+	 * Returns the file size in bytes.
+	 *
+	 * @return The file size in bytes.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	@Override
+	public long getStateSize() throws IOException {
+		return getFileSize();
+	}
+
+	@Override
+	public <T extends Serializable> StateHandle<T> toSerializableHandle() {
+		FileSerializableStateHandle<T> handle = new FileSerializableStateHandle<>(getFilePath());
+
+		// forward closed status
+		if (isClosed()) {
+			try {
+				handle.close();
+			} catch (IOException e) {
+				// should not happen on a fresh handle, but forward anyways
+				throw new RuntimeException(e);
+			}
+		}
+
+		return handle;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsFoldingState.java
new file mode 100644
index 0000000..e1bac83
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsFoldingState.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+
+@Deprecated
+public class FsFoldingState<K, N, T, ACC> {
+	public static class Snapshot<K, N, T, ACC> extends AbstractFsStateSnapshot<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<ACC> stateSerializer,
+			FoldingStateDescriptor<T, ACC> stateDescs,
+			Path filePath) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsListState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsListState.java
new file mode 100644
index 0000000..d4e3d4b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsListState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+
+import java.util.ArrayList;
+
+@Deprecated
+public class FsListState<K, N, V> {
+
+	public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<ArrayList<V>> stateSerializer,
+			ListStateDescriptor<V> stateDescs,
+			Path filePath) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsReducingState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsReducingState.java
new file mode 100644
index 0000000..5cd9505
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsReducingState.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+
+@Deprecated
+public class FsReducingState<K, N, V> {
+
+	public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<V> stateSerializer,
+			ReducingStateDescriptor<V> stateDescs,
+			Path filePath) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsStateBackend.java
new file mode 100644
index 0000000..e964ec9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsStateBackend.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.state.AbstractStateBackend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Deprecated
+public class FsStateBackend extends AbstractStateBackend {
+
+	private static final long serialVersionUID = -8191916350224044011L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class);
+
+	/** By default, state smaller than 1024 bytes will not be written to files, but
+	 * will be stored directly with the metadata */
+	public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024;
+
+	/** Maximum size of state that is stored with the metadata, rather than in files */
+	public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+	
+	/** Default size for the write buffer */
+	private static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
+	
+
+	/** The path to the directory for the checkpoint data, including the file system
+	 * description via scheme and optional authority */
+	private final Path basePath = null;
+
+	/** State below this size will be stored as part of the metadata, rather than in files */
+	private final int fileStateThreshold = 0;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsValueState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsValueState.java
new file mode 100644
index 0000000..3b432a3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsValueState.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+
+@Deprecated
+public class FsValueState<K, N, V> {
+
+	public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, V, ValueState<V>, ValueStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<V> stateSerializer,
+			ValueStateDescriptor<V> stateDescs,
+			Path filePath) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
new file mode 100644
index 0000000..3336556
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+@Deprecated
+public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> 
+		implements KvStateSnapshot<K, N, S, SD> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** Key Serializer */
+	protected final TypeSerializer<K> keySerializer;
+
+	/** Namespace Serializer */
+	protected final TypeSerializer<N> namespaceSerializer;
+
+	/** Serializer for the state value */
+	protected final TypeSerializer<SV> stateSerializer;
+
+	/** StateDescriptor, for sanity checks */
+	protected final SD stateDesc;
+
+	/** The serialized data of the state key/value pairs */
+	private final byte[] data;
+	
+	private transient boolean closed;
+
+	/**
+	 * Creates a new heap memory state snapshot.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateSerializer The serializer for the elements in the state HashMap
+	 * @param stateDesc The state identifier
+	 * @param data The serialized data of the state key/value pairs
+	 */
+	public AbstractMemStateSnapshot(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		TypeSerializer<SV> stateSerializer,
+		SD stateDesc,
+		byte[] data) {
+		this.keySerializer = keySerializer;
+		this.namespaceSerializer = namespaceSerializer;
+		this.stateSerializer = stateSerializer;
+		this.stateDesc = stateDesc;
+		this.data = data;
+	}
+
+	public HashMap<N, Map<K, SV>> deserialize() throws IOException {
+		DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length);
+
+		final int numKeys = inView.readInt();
+		HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
+
+		for (int i = 0; i < numKeys && !closed; i++) {
+			N namespace = namespaceSerializer.deserialize(inView);
+			final int numValues = inView.readInt();
+			Map<K, SV> namespaceMap = new HashMap<>(numValues);
+			stateMap.put(namespace, namespaceMap);
+			for (int j = 0; j < numValues; j++) {
+				K key = keySerializer.deserialize(inView);
+				SV value = stateSerializer.deserialize(inView);
+				namespaceMap.put(key, value);
+			}
+		}
+		return stateMap;
+	}
+
+	/**
+	 * Discarding the heap state is a no-op.
+	 */
+	@Override
+	public void discardState() {}
+
+	@Override
+	public long getStateSize() {
+		return data.length;
+	}
+
+	@Override
+	public void close() {
+		closed = true;
+	}
+
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	public TypeSerializer<SV> getStateSerializer() {
+		return stateSerializer;
+	}
+
+	public byte[] getData() {
+		return data;
+	}
+
+	@Override
+	public String toString() {
+		return "AbstractMemStateSnapshot{" +
+				"keySerializer=" + keySerializer +
+				", namespaceSerializer=" + namespaceSerializer +
+				", stateSerializer=" + stateSerializer +
+				", stateDesc=" + stateDesc +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/ByteStreamStateHandle.java
new file mode 100644
index 0000000..d9474dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/ByteStreamStateHandle.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.migration.runtime.state.AbstractCloseableHandle;
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.runtime.state.StreamStateHandle;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+
+@Deprecated
+public final class ByteStreamStateHandle extends AbstractCloseableHandle implements StreamStateHandle {
+
+	private static final long serialVersionUID = -5280226231200217594L;
+	
+	/** the state data */
+	private final byte[] data;
+
+	/**
+	 * Creates a new ByteStreamStateHandle containing the given data.
+	 * 
+	 * @param data The state data.
+	 */
+	public ByteStreamStateHandle(byte[] data) {
+		this.data = data;
+	}
+
+	@Override
+	public InputStream getState(ClassLoader userCodeClassLoader) throws Exception {
+		ensureNotClosed();
+
+		ByteArrayInputStream stream = new ByteArrayInputStream(data);
+		registerCloseable(stream);
+
+		return stream;
+	}
+
+	@Override
+	public void discardState() {}
+
+	@Override
+	public long getStateSize() {
+		return data.length;
+	}
+
+	@Override
+	public <T extends Serializable> StateHandle<T> toSerializableHandle() {
+		SerializedStateHandle<T> serializableHandle = new SerializedStateHandle<T>(data);
+
+		// forward the closed status
+		if (isClosed()) {
+			try {
+				serializableHandle.close();
+			} catch (IOException e) {
+				// should not happen on a fresh handle, but forward anyways
+				throw new RuntimeException(e);
+			}
+		}
+
+		return serializableHandle;
+	}
+
+	public byte[] getData() {
+		return data;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemFoldingState.java
new file mode 100644
index 0000000..d6c63c4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemFoldingState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+@Deprecated
+public class MemFoldingState<K, N, T, ACC> {
+
+	public static class Snapshot<K, N, T, ACC> extends AbstractMemStateSnapshot<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<ACC> stateSerializer,
+			FoldingStateDescriptor<T, ACC> stateDescs, byte[] data) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemListState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemListState.java
new file mode 100644
index 0000000..416a898
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemListState.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.ArrayList;
+
+@Deprecated
+public class MemListState<K, N, V> {
+
+	public static class Snapshot<K, N, V> extends AbstractMemStateSnapshot<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<ArrayList<V>> stateSerializer,
+			ListStateDescriptor<V> stateDescs, byte[] data) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemReducingState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemReducingState.java
new file mode 100644
index 0000000..52d60a9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemReducingState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Heap-backed partitioned {@link ReducingState} that is
+ * snapshotted into a serialized memory copy.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the values in the list state.
+ */
+@Deprecated
+public class MemReducingState<K, N, V> {
+
+	public static class Snapshot<K, N, V> extends AbstractMemStateSnapshot<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<V> stateSerializer,
+			ReducingStateDescriptor<V> stateDescs, byte[] data) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data);
+		}
+	}}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemValueState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemValueState.java
new file mode 100644
index 0000000..ff9bed8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemValueState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Heap-backed key/value state that is snapshotted into a serialized memory copy.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the value.
+ */
+@Deprecated
+public class MemValueState<K, N, V> {
+
+	public static class Snapshot<K, N, V> extends AbstractMemStateSnapshot<K, N, V, ValueState<V>, ValueStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<V> stateSerializer,
+			ValueStateDescriptor<V> stateDescs, byte[] data) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java
new file mode 100644
index 0000000..d3c9b6c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.migration.runtime.state.AbstractCloseableHandle;
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.util.MigrationInstantiationUtil;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A state handle that represents its state in serialized form as bytes.
+ *
+ * @param <T> The type of state represented by this state handle.
+ */
+public class SerializedStateHandle<T extends Serializable> extends AbstractCloseableHandle implements StateHandle<T> {
+	
+	private static final long serialVersionUID = 4145685722538475769L;
+
+	/** The serialized data */
+	private final byte[] serializedData;
+	
+	/**
+	 * Creates a new serialized state handle, eagerly serializing the given state object.
+	 * 
+	 * @param value The state object.
+	 * @throws IOException Thrown, if the serialization fails.
+	 */
+	public SerializedStateHandle(T value) throws IOException {
+		this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
+	}
+
+	/**
+	 * Creates a new serialized state handle, based in the given already serialized data.
+	 * 
+	 * @param serializedData The serialized data.
+	 */
+	public SerializedStateHandle(byte[] serializedData) {
+		this.serializedData = serializedData;
+	}
+	
+	@Override
+	public T getState(ClassLoader classLoader) throws Exception {
+		if (classLoader == null) {
+			throw new NullPointerException();
+		}
+
+		ensureNotClosed();
+		return serializedData == null ? null : MigrationInstantiationUtil.<T>deserializeObject(serializedData, classLoader);
+	}
+
+	/**
+	 * Gets the size of the serialized state.
+	 * @return The size of the serialized state.
+	 */
+	public int getSizeOfSerializedState() {
+		return serializedData.length;
+	}
+
+	/**
+	 * Discarding heap-memory backed state is a no-op, so this method does nothing.
+	 */
+	@Override
+	public void discardState() {}
+
+	@Override
+	public long getStateSize() {
+		return serializedData.length;
+	}
+
+	public byte[] getSerializedData() {
+		return serializedData;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
new file mode 100644
index 0000000..1bebcb6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+@Internal
+@Deprecated
+/**
+ * This class is just a KeyGroupsStateHandle that is tagged as migration, to figure out which restore logic to apply,
+ * e.g. when restoring backend data from a state handle.
+ */
+public class MigrationKeyGroupStateHandle extends KeyGroupsStateHandle {
+
+	private static final long serialVersionUID = -8554427169776881697L;
+
+	/**
+	 * @param groupRangeOffsets range of key-group ids that in the state of this handle
+	 * @param streamStateHandle handle to the actual state of the key-groups
+	 */
+	public MigrationKeyGroupStateHandle(KeyGroupRangeOffsets groupRangeOffsets, StreamStateHandle streamStateHandle) {
+		super(groupRangeOffsets, streamStateHandle);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
new file mode 100644
index 0000000..e7aa788
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.IOException;
+
+@Internal
+@Deprecated
+/**
+ * This class is just a StreamStateHandle that is tagged as migration, to figure out which restore logic to apply, e.g.
+ * when restoring backend data from a state handle.
+ */
+public class MigrationStreamStateHandle implements StreamStateHandle {
+
+	private static final long serialVersionUID = -2332113722532150112L;
+	private final StreamStateHandle delegate;
+
+	public MigrationStreamStateHandle(StreamStateHandle delegate) {
+		this.delegate = delegate;
+	}
+
+	@Override
+	public FSDataInputStream openInputStream() throws IOException {
+		return delegate.openInputStream();
+	}
+
+	@Override
+	public void discardState() throws Exception {
+		delegate.discardState();
+	}
+
+	@Override
+	public long getStateSize() throws IOException {
+		return delegate.getStateSize();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
new file mode 100644
index 0000000..f5af185
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.StateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+
+@Deprecated
+@Internal
+public class StreamTaskState implements Serializable, Closeable {
+
+	private static final long serialVersionUID = 1L;
+	
+	private StateHandle<?> operatorState;
+
+	private StateHandle<Serializable> functionState;
+
+	private HashMap<String, KvStateSnapshot<?, ?, ?, ?>> kvStates;
+
+	// ------------------------------------------------------------------------
+
+	public StateHandle<?> getOperatorState() {
+		return operatorState;
+	}
+
+	public void setOperatorState(StateHandle<?> operatorState) {
+		this.operatorState = operatorState;
+	}
+
+	public StateHandle<Serializable> getFunctionState() {
+		return functionState;
+	}
+
+	public void setFunctionState(StateHandle<Serializable> functionState) {
+		this.functionState = functionState;
+	}
+
+	public HashMap<String, KvStateSnapshot<?, ?, ?, ?>> getKvStates() {
+		return kvStates;
+	}
+
+	public void setKvStates(HashMap<String, KvStateSnapshot<?, ?, ?, ?>> kvStates) {
+		this.kvStates = kvStates;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Checks if this state object actually contains any state, or if all of the state
+	 * fields are null.
+	 * 
+	 * @return True, if all state is null, false if at least one state is not null.
+	 */
+	public boolean isEmpty() {
+		return operatorState == null & functionState == null & kvStates == null;
+	}
+
+
+	@Override
+	public void close() throws IOException {
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
new file mode 100644
index 0000000..8b0dcd3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.StateHandle;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+@Deprecated
+@Internal
+public class StreamTaskStateList implements StateHandle<StreamTaskState[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The states for all operator */
+	private final StreamTaskState[] states;
+
+	public StreamTaskStateList(StreamTaskState[] states) throws Exception {
+		this.states = states;
+	}
+
+	public boolean isEmpty() {
+		for (StreamTaskState state : states) {
+			if (state != null) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Override
+	public StreamTaskState[] getState(ClassLoader userCodeClassLoader) {
+		return states;
+	}
+
+	@Override
+	public void discardState() throws Exception {
+	}
+
+	@Override
+	public long getStateSize() throws Exception {
+		long sumStateSize = 0;
+
+		if (states != null) {
+			for (StreamTaskState state : states) {
+				if (state != null) {
+					StateHandle<?> operatorState = state.getOperatorState();
+					StateHandle<?> functionState = state.getFunctionState();
+					HashMap<String, KvStateSnapshot<?, ?, ?, ?>> kvStates = state.getKvStates();
+
+					if (operatorState != null) {
+						sumStateSize += operatorState.getStateSize();
+					}
+
+					if (functionState != null) {
+						sumStateSize += functionState.getStateSize();
+					}
+
+					if (kvStates != null) {
+						for (KvStateSnapshot<?, ?, ?, ?> kvState : kvStates.values()) {
+							if (kvState != null) {
+								sumStateSize += kvState.getStateSize();
+							}
+						}
+					}
+				}
+			}
+		}
+
+		// State size as sum of all state sizes
+		return sumStateSize;
+	}
+
+	@Override
+	public void close() throws IOException {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 66740c7..172e425 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -46,6 +46,7 @@ public class SavepointLoader {
 	 * @param jobId          The JobID of the job to load the savepoint for.
 	 * @param tasks          Tasks that will possibly be reset
 	 * @param savepointPath  The path of the savepoint to rollback to
+	 * @param userClassLoader The user code classloader
 	 * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
 	 * to any job vertex in tasks.
 	 *
@@ -56,10 +57,11 @@ public class SavepointLoader {
 			JobID jobId,
 			Map<JobVertexID, ExecutionJobVertex> tasks,
 			String savepointPath,
+			ClassLoader userClassLoader,
 			boolean allowNonRestoredState) throws IOException {
 
 		// (1) load the savepoint
-		Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath);
+		Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath, userClassLoader);
 		final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size());
 
 		// (2) validate it (parallelism, etc)

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
index 6a55b33..9d0f1e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
@@ -46,9 +46,10 @@ public interface SavepointSerializer<T extends Savepoint> {
 	 * Deserializes a savepoint from an input stream.
 	 *
 	 * @param dis Input stream to deserialize savepoint from
+	 * @param  userCodeClassLoader the user code class loader
 	 * @return The deserialized savepoint
 	 * @throws IOException Serialization failures are forwarded
 	 */
-	T deserialize(DataInputStream dis) throws IOException;
+	T deserialize(DataInputStream dis, ClassLoader userCodeClassLoader) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index 20b3d89..3155d60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
 import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
@@ -30,10 +31,10 @@ public class SavepointSerializers {
 
 
 	private static final int SAVEPOINT_VERSION_0 = 0;
-	private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(1);
+	private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(2);
 
 	static {
-		SERIALIZERS.put(SAVEPOINT_VERSION_0, null);
+		SERIALIZERS.put(SAVEPOINT_VERSION_0, SavepointV0Serializer.INSTANCE);
 		SERIALIZERS.put(SavepointV1.VERSION, SavepointV1Serializer.INSTANCE);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 4b65418..48cca20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -50,7 +50,7 @@ public class SavepointStore {
 	private static final Logger LOG = LoggerFactory.getLogger(SavepointStore.class);
 
 	/** Magic number for sanity checks against stored savepoints. */
-	private static final int MAGIC_NUMBER = 0x4960672d;
+	public static final int MAGIC_NUMBER = 0x4960672d;
 
 	/** Prefix for savepoint files. */
 	private static final String prefix = "savepoint-";
@@ -125,7 +125,7 @@ public class SavepointStore {
 	 * @return The loaded savepoint
 	 * @throws Exception Failures during load are forwared
 	 */
-	public static Savepoint loadSavepoint(String path) throws IOException {
+	public static Savepoint loadSavepoint(String path, ClassLoader userClassLoader) throws IOException {
 		Preconditions.checkNotNull(path, "Path");
 
 		try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
@@ -135,7 +135,7 @@ public class SavepointStore {
 				int version = dis.readInt();
 
 				SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
-				return serializer.deserialize(dis);
+				return serializer.deserialize(dis, userClassLoader);
 			} else {
 				throw new RuntimeException("Unexpected magic number. This is most likely " +
 						"caused by trying to load a Flink 1.0 savepoint. You cannot load a " +

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 89f1f42..cd3e87f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -94,7 +94,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 	}
 
 	@Override
-	public SavepointV1 deserialize(DataInputStream dis) throws IOException {
+	public SavepointV1 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
 		long checkpointId = dis.readLong();
 
 		// Task states
@@ -124,7 +124,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 		return new SavepointV1(checkpointId, taskStates);
 	}
 
-	public static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
+	private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
 
 		dos.writeLong(subtaskState.getDuration());
 
@@ -163,7 +163,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 
 	}
 
-	public static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
+	private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
 
 		long duration = dis.readLong();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index ae71c7f..2daf896 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -92,7 +92,7 @@ public abstract class AbstractKeyedStateBackend<K>
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange) {
 
-		this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
+		this.kvStateRegistry = kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry);
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
 		this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
new file mode 100644
index 0000000..7492262
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.AbstractMultiFSDataInputStream;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Wrapper class that takes multiple {@link StreamStateHandle} and makes them look like a single one. This is done by
+ * providing a contiguous view on all the streams of the inner handles through a wrapper stream and by summing up all
+ * all the meta data.
+ */
+public class MultiStreamStateHandle implements StreamStateHandle {
+
+	private static final long serialVersionUID = -4588701089489569707L;
+	private final List<StreamStateHandle> stateHandles;
+	private final long stateSize;
+
+	public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) throws IOException {
+		this.stateHandles = Preconditions.checkNotNull(stateHandles);
+		long calculateSize = 0L;
+		for(StreamStateHandle stateHandle : stateHandles) {
+			calculateSize += stateHandle.getStateSize();
+		}
+		this.stateSize = calculateSize;
+	}
+
+	@Override
+	public FSDataInputStream openInputStream() throws IOException {
+		return new MultiFSDataInputStream(stateHandles);
+	}
+
+	@Override
+	public void discardState() throws Exception {
+		StateUtil.bestEffortDiscardAllStateObjects(stateHandles);
+	}
+
+	@Override
+	public long getStateSize() throws IOException {
+		return stateSize;
+	}
+
+	static final class MultiFSDataInputStream extends AbstractMultiFSDataInputStream {
+
+		private final TreeMap<Long, StreamStateHandle> stateHandleMap;
+
+		public MultiFSDataInputStream(List<StreamStateHandle> stateHandles) throws IOException {
+			this.stateHandleMap = new TreeMap<>();
+			this.totalPos = 0L;
+			long calculateSize = 0L;
+			for (StreamStateHandle stateHandle : stateHandles) {
+				stateHandleMap.put(calculateSize, stateHandle);
+				calculateSize += stateHandle.getStateSize();
+			}
+			this.totalAvailable = calculateSize;
+
+			if (totalAvailable > 0L) {
+				StreamStateHandle first = stateHandleMap.firstEntry().getValue();
+				delegate = first.openInputStream();
+			}
+		}
+
+		@Override
+		protected FSDataInputStream getSeekedStreamForOffset(long globalStreamOffset) throws IOException {
+			Map.Entry<Long, StreamStateHandle> handleEntry = stateHandleMap.floorEntry(globalStreamOffset);
+			if (handleEntry != null) {
+				FSDataInputStream stream = handleEntry.getValue().openInputStream();
+				stream.seek(globalStreamOffset - handleEntry.getKey());
+				return stream;
+			}
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 56be46f..aab2ee5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
@@ -28,12 +29,18 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.migration.MigrationUtil;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.filesystem.AbstractFsStateSnapshot;
+import org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.ArrayListSerializer;
@@ -43,6 +50,8 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -103,7 +112,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			LOG.debug("Restoring snapshot from state handles: {}.", restoredState);
 		}
 
-		restorePartitionedState(restoredState);
+		if (MigrationUtil.isOldSavepointKeyedState(restoredState)) {
+			restoreOldSavepointKeyedState(restoredState);
+		} else {
+			restorePartitionedState(restoredState);
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -346,4 +359,132 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return "HeapKeyedStateBackend";
 	}
 
+	/**
+	 * REMOVE
+	 */
+	@Internal
+	@Deprecated
+	public Map<String, StateTable<K, ?, ?>> getStateTables() {
+		return stateTables;
+	}
+
+	@Deprecated
+	private void restoreOldSavepointKeyedState(
+			Collection<KeyGroupsStateHandle> stateHandles) throws IOException, ClassNotFoundException {
+
+		if (stateHandles.isEmpty()) {
+			return;
+		}
+
+		Preconditions.checkState(1 == stateHandles.size(), "Only one element expected here.");
+
+		HashMap<String, KvStateSnapshot<K, ?, ?, ?>> namedStates =
+				InstantiationUtil.deserializeObject(stateHandles.iterator().next().openInputStream(), userCodeClassLoader);
+
+		for (Map.Entry<String, KvStateSnapshot<K, ?, ?, ?>> nameToState : namedStates.entrySet()) {
+
+			KvStateSnapshot<K, ?, ?, ?> genericSnapshot = nameToState.getValue();
+
+			final RestoredState restoredState;
+
+			if (genericSnapshot instanceof AbstractMemStateSnapshot) {
+
+				AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot =
+						(AbstractMemStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue();
+
+				restoredState = restoreHeapState(stateSnapshot);
+
+			} else if (genericSnapshot instanceof AbstractFsStateSnapshot) {
+
+				AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot =
+						(AbstractFsStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue();
+				restoredState = restoreFsState(stateSnapshot);
+			} else {
+				throw new IllegalStateException("Unknown state: " + genericSnapshot);
+			}
+
+			Map rawResultMap = restoredState.getRawResultMap();
+			TypeSerializer<?> namespaceSerializer = restoredState.getNamespaceSerializer();
+			TypeSerializer<?> stateSerializer = restoredState.getStateSerializer();
+
+			if (namespaceSerializer instanceof VoidSerializer) {
+				namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
+			}
+
+			Map nullNameSpaceFix = (Map) rawResultMap.remove(null);
+
+			if (null != nullNameSpaceFix) {
+				rawResultMap.put(VoidNamespace.INSTANCE, nullNameSpaceFix);
+			}
+
+			StateTable<K, ?, ?> stateTable = new StateTable<>(stateSerializer, namespaceSerializer, keyGroupRange);
+			stateTable.getState().set(0, rawResultMap);
+
+			// add named state to the backend
+			getStateTables().put(nameToState.getKey(), stateTable);
+		}
+	}
+
+	private RestoredState restoreHeapState(AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
+		return new RestoredState(
+				stateSnapshot.deserialize(),
+				stateSnapshot.getNamespaceSerializer(),
+				stateSnapshot.getStateSerializer());
+	}
+
+	private RestoredState restoreFsState(AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
+		FileSystem fs = stateSnapshot.getFilePath().getFileSystem();
+		//TODO register closeable to support fast cancelation?
+		try (FSDataInputStream inStream = fs.open(stateSnapshot.getFilePath())) {
+
+			DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
+
+			final int numNamespaces = inView.readInt();
+			HashMap rawResultMap = new HashMap<>(numNamespaces);
+
+			TypeSerializer<K> keySerializer = stateSnapshot.getKeySerializer();
+			TypeSerializer<?> namespaceSerializer = stateSnapshot.getNamespaceSerializer();
+			TypeSerializer<?> stateSerializer = stateSnapshot.getStateSerializer();
+
+			for (int i = 0; i < numNamespaces; i++) {
+				Object namespace = namespaceSerializer.deserialize(inView);
+				final int numKV = inView.readInt();
+				Map<K, Object> namespaceMap = new HashMap<>(numKV);
+				rawResultMap.put(namespace, namespaceMap);
+				for (int j = 0; j < numKV; j++) {
+					K key = keySerializer.deserialize(inView);
+					Object value = stateSerializer.deserialize(inView);
+					namespaceMap.put(key, value);
+				}
+			}
+			return new RestoredState(rawResultMap, namespaceSerializer, stateSerializer);
+		} catch (Exception e) {
+			throw new IOException("Failed to restore state from file system", e);
+		}
+	}
+
+	static final class RestoredState {
+
+		private final Map rawResultMap;
+		private final TypeSerializer<?> namespaceSerializer;
+		private final TypeSerializer<?> stateSerializer ;
+
+		public RestoredState(Map rawResultMap, TypeSerializer<?> namespaceSerializer, TypeSerializer<?> stateSerializer) {
+			this.rawResultMap = rawResultMap;
+			this.namespaceSerializer = namespaceSerializer;
+			this.stateSerializer = stateSerializer;
+		}
+
+		public Map getRawResultMap() {
+			return rawResultMap;
+		}
+
+		public TypeSerializer<?> getNamespaceSerializer() {
+			return namespaceSerializer;
+		}
+
+		public TypeSerializer<?> getStateSerializer() {
+			return stateSerializer;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0ffca55..982efe8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -816,8 +816,10 @@ class JobManager(
       future {
         try {
           log.info(s"Disposing savepoint at '$savepointPath'.")
-
-          val savepoint = SavepointStore.loadSavepoint(savepointPath)
+          //TODO user code class loader ?
+          val savepoint = SavepointStore.loadSavepoint(
+            savepointPath,
+            Thread.currentThread().getContextClassLoader)
 
           log.debug(s"$savepoint")
 
@@ -1199,7 +1201,7 @@ class JobManager(
               "Cannot set up the user code libraries: " + t.getMessage, t)
         }
 
-        val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
+        var userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
         if (userCodeLoader == null) {
           throw new JobSubmissionException(jobId,
             "The user code class loader could not be initialized.")
@@ -1316,9 +1318,13 @@ class JobManager(
                 log.info(s"Starting job from savepoint '$savepointPath'" +
                   (if (allowNonRestored) " (allowing non restored state)" else "") + ".")
 
-                // load the savepoint as a checkpoint into the system
-                val savepoint: CompletedCheckpoint = SavepointLoader.loadAndValidateSavepoint(
-                  jobId, executionGraph.getAllVertices, savepointPath, allowNonRestored)
+                  // load the savepoint as a checkpoint into the system
+                  val savepoint: CompletedCheckpoint = SavepointLoader.loadAndValidateSavepoint(
+                    jobId,
+                    executionGraph.getAllVertices,
+                    savepointPath,
+                    executionGraph.getUserClassLoader,
+                    allowNonRestored)
 
                 executionGraph.getCheckpointCoordinator.getCheckpointStore
                   .addCheckpoint(savepoint)