You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/07 13:06:30 UTC

[2/3] flink git commit: [FLINK-5041] Savepoint backwards compatibility 1.1 -> 1.2

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StreamStateHandle.java
new file mode 100644
index 0000000..55f9b58
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StreamStateHandle.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state;
+
+import java.io.InputStream;
+import java.io.Serializable;
+
+/**
+ * A state handle that produces an input stream when resolved.
+ */
+@Deprecated
+public interface StreamStateHandle extends StateHandle<InputStream> {
+
+	/**
+	 * Converts this stream state handle into a state handle that de-serializes
+	 * the stream into an object using Java's serialization mechanism.
+	 *
+	 * @return The state handle that automatically de-serializes.
+	 */
+	<T extends Serializable> StateHandle<T> toSerializableHandle();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java
new file mode 100644
index 0000000..25a0e89
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.state.AbstractCloseableHandle;
+import org.apache.flink.migration.runtime.state.StateObject;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state that is stored in a file.
+ */
+@Deprecated
+public abstract class AbstractFileStateHandle extends AbstractCloseableHandle implements StateObject {
+
+	private static final long serialVersionUID = 350284443258002355L;
+
+	/** The path to the file in the filesystem, fully describing the file system */
+	private final Path filePath;
+
+	/** Cached file system handle */
+	private transient FileSystem fs;
+
+	/**
+	 * Creates a new file state for the given file path.
+	 * 
+	 * @param filePath The path to the file that stores the state.
+	 */
+	protected AbstractFileStateHandle(Path filePath) {
+		this.filePath = checkNotNull(filePath);
+	}
+
+	/**
+	 * Gets the path where this handle's state is stored.
+	 * @return The path where this handle's state is stored.
+	 */
+	public Path getFilePath() {
+		return filePath;
+	}
+
+	/**
+	 * Discard the state by deleting the file that stores the state. If the parent directory
+	 * of the state is empty after deleting the state file, it is also deleted.
+	 * 
+	 * @throws Exception Thrown, if the file deletion (not the directory deletion) fails.
+	 */
+	@Override
+	public void discardState() throws Exception {
+		getFileSystem().delete(filePath, false);
+
+		// send a call to delete the checkpoint directory containing the file. This will
+		// fail (and be ignored) when some files still exist
+		try {
+			getFileSystem().delete(filePath.getParent(), false);
+		} catch (IOException ignored) {}
+	}
+
+	/**
+	 * Gets the file system that stores the file state.
+	 * @return The file system that stores the file state.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	protected FileSystem getFileSystem() throws IOException {
+		if (fs == null) {
+			fs = FileSystem.get(filePath.toUri());
+		}
+		return fs;
+	}
+
+	/**
+	 * Returns the file size in bytes.
+	 *
+	 * @return The file size in bytes.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	protected long getFileSize() throws IOException {
+		return getFileSystem().getFileStatus(filePath).getLen();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
new file mode 100644
index 0000000..59c373b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+
+import java.io.IOException;
+
+/**
+ * A snapshot of a heap key/value state stored in a file.
+ * 
+ * @param <K> The type of the key in the snapshot state.
+ * @param <N> The type of the namespace in the snapshot state.
+ * @param <SV> The type of the state value.
+ */
+@Deprecated
+public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> 
+		extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** Key Serializer */
+	protected final TypeSerializer<K> keySerializer;
+
+	/** Namespace Serializer */
+	protected final TypeSerializer<N> namespaceSerializer;
+
+	/** Serializer for the state value */
+	protected final TypeSerializer<SV> stateSerializer;
+
+	/** StateDescriptor, for sanity checks */
+	protected final SD stateDesc;
+
+	public AbstractFsStateSnapshot(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		TypeSerializer<SV> stateSerializer,
+		SD stateDesc,
+		Path filePath) {
+		super(filePath);
+		this.stateDesc = stateDesc;
+		this.keySerializer = keySerializer;
+		this.stateSerializer = stateSerializer;
+		this.namespaceSerializer = namespaceSerializer;
+
+	}
+
+	@Override
+	public long getStateSize() throws IOException {
+		return getFileSize();
+	}
+
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	public TypeSerializer<SV> getStateSerializer() {
+		return stateSerializer;
+	}
+
+	public SD getStateDesc() {
+		return stateDesc;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileSerializableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileSerializableStateHandle.java
new file mode 100644
index 0000000..ef908f5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileSerializableStateHandle.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.util.MigrationInstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+/**
+ * A state handle that points to state stored in a file via Java Serialization.
+ * 
+ * @param <T> The type of state pointed to by the state handle.
+ */
+@Deprecated
+public class FileSerializableStateHandle<T extends Serializable> extends AbstractFileStateHandle implements StateHandle<T> {
+
+	private static final long serialVersionUID = -657631394290213622L;
+
+	/**
+	 * Creates a new FileSerializableStateHandle pointing to state at the given file path.
+	 * 
+	 * @param filePath The path to the file containing the checkpointed state.
+	 */
+	public FileSerializableStateHandle(Path filePath) {
+		super(filePath);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public T getState(ClassLoader classLoader) throws Exception {
+		ensureNotClosed();
+
+		try (FSDataInputStream inStream = getFileSystem().open(getFilePath())) {
+			// make sure any deserialization can be aborted
+			registerCloseable(inStream);
+
+			ObjectInputStream ois = new MigrationInstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
+			return (T) ois.readObject();
+		}
+	}
+
+	/**
+	 * Returns the file size in bytes.
+	 *
+	 * @return The file size in bytes.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	@Override
+	public long getStateSize() throws IOException {
+		return getFileSize();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileStreamStateHandle.java
new file mode 100644
index 0000000..89ff4c4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FileStreamStateHandle.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.runtime.state.StreamStateHandle;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+
+/**
+ * A state handle that points to state in a file system, accessible as an input stream.
+ */
+@Deprecated
+public class FileStreamStateHandle extends AbstractFileStateHandle implements StreamStateHandle {
+
+	private static final long serialVersionUID = -6826990484549987311L;
+
+	/**
+	 * Creates a new FileStreamStateHandle pointing to state at the given file path.
+	 * 
+	 * @param filePath The path to the file containing the checkpointed state.
+	 */
+	public FileStreamStateHandle(Path filePath) {
+		super(filePath);
+	}
+
+	@Override
+	public InputStream getState(ClassLoader userCodeClassLoader) throws Exception {
+		ensureNotClosed();
+
+		InputStream inStream = getFileSystem().open(getFilePath());
+		// make sure the state handle is cancelable
+		registerCloseable(inStream);
+
+		return inStream; 
+	}
+
+	/**
+	 * Returns the file size in bytes.
+	 *
+	 * @return The file size in bytes.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	@Override
+	public long getStateSize() throws IOException {
+		return getFileSize();
+	}
+
+	@Override
+	public <T extends Serializable> StateHandle<T> toSerializableHandle() {
+		FileSerializableStateHandle<T> handle = new FileSerializableStateHandle<>(getFilePath());
+
+		// forward closed status
+		if (isClosed()) {
+			try {
+				handle.close();
+			} catch (IOException e) {
+				// should not happen on a fresh handle, but forward anyways
+				throw new RuntimeException(e);
+			}
+		}
+
+		return handle;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsFoldingState.java
new file mode 100644
index 0000000..e1bac83
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsFoldingState.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+
+@Deprecated
+public class FsFoldingState<K, N, T, ACC> {
+	public static class Snapshot<K, N, T, ACC> extends AbstractFsStateSnapshot<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<ACC> stateSerializer,
+			FoldingStateDescriptor<T, ACC> stateDescs,
+			Path filePath) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsListState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsListState.java
new file mode 100644
index 0000000..d4e3d4b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsListState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+
+import java.util.ArrayList;
+
+@Deprecated
+public class FsListState<K, N, V> {
+
+	public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<ArrayList<V>> stateSerializer,
+			ListStateDescriptor<V> stateDescs,
+			Path filePath) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsReducingState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsReducingState.java
new file mode 100644
index 0000000..5cd9505
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsReducingState.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+
+@Deprecated
+public class FsReducingState<K, N, V> {
+
+	public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<V> stateSerializer,
+			ReducingStateDescriptor<V> stateDescs,
+			Path filePath) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsStateBackend.java
new file mode 100644
index 0000000..e964ec9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsStateBackend.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.state.AbstractStateBackend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Deprecated
+public class FsStateBackend extends AbstractStateBackend {
+
+	private static final long serialVersionUID = -8191916350224044011L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class);
+
+	/** By default, state smaller than 1024 bytes will not be written to files, but
+	 * will be stored directly with the metadata */
+	public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024;
+
+	/** Maximum size of state that is stored with the metadata, rather than in files */
+	public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+	
+	/** Default size for the write buffer */
+	private static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
+	
+
+	/** The path to the directory for the checkpoint data, including the file system
+	 * description via scheme and optional authority */
+	private final Path basePath = null;
+
+	/** State below this size will be stored as part of the metadata, rather than in files */
+	private final int fileStateThreshold = 0;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsValueState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsValueState.java
new file mode 100644
index 0000000..3b432a3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsValueState.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+
+@Deprecated
+public class FsValueState<K, N, V> {
+
+	public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, V, ValueState<V>, ValueStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<V> stateSerializer,
+			ValueStateDescriptor<V> stateDescs,
+			Path filePath) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
new file mode 100644
index 0000000..3336556
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+@Deprecated
+public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> 
+		implements KvStateSnapshot<K, N, S, SD> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** Key Serializer */
+	protected final TypeSerializer<K> keySerializer;
+
+	/** Namespace Serializer */
+	protected final TypeSerializer<N> namespaceSerializer;
+
+	/** Serializer for the state value */
+	protected final TypeSerializer<SV> stateSerializer;
+
+	/** StateDescriptor, for sanity checks */
+	protected final SD stateDesc;
+
+	/** The serialized data of the state key/value pairs */
+	private final byte[] data;
+	
+	private transient boolean closed;
+
+	/**
+	 * Creates a new heap memory state snapshot.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateSerializer The serializer for the elements in the state HashMap
+	 * @param stateDesc The state identifier
+	 * @param data The serialized data of the state key/value pairs
+	 */
+	public AbstractMemStateSnapshot(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		TypeSerializer<SV> stateSerializer,
+		SD stateDesc,
+		byte[] data) {
+		this.keySerializer = keySerializer;
+		this.namespaceSerializer = namespaceSerializer;
+		this.stateSerializer = stateSerializer;
+		this.stateDesc = stateDesc;
+		this.data = data;
+	}
+
+	public HashMap<N, Map<K, SV>> deserialize() throws IOException {
+		DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length);
+
+		final int numKeys = inView.readInt();
+		HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
+
+		for (int i = 0; i < numKeys && !closed; i++) {
+			N namespace = namespaceSerializer.deserialize(inView);
+			final int numValues = inView.readInt();
+			Map<K, SV> namespaceMap = new HashMap<>(numValues);
+			stateMap.put(namespace, namespaceMap);
+			for (int j = 0; j < numValues; j++) {
+				K key = keySerializer.deserialize(inView);
+				SV value = stateSerializer.deserialize(inView);
+				namespaceMap.put(key, value);
+			}
+		}
+		return stateMap;
+	}
+
+	/**
+	 * Discarding the heap state is a no-op.
+	 */
+	@Override
+	public void discardState() {}
+
+	@Override
+	public long getStateSize() {
+		return data.length;
+	}
+
+	@Override
+	public void close() {
+		closed = true;
+	}
+
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	public TypeSerializer<SV> getStateSerializer() {
+		return stateSerializer;
+	}
+
+	public byte[] getData() {
+		return data;
+	}
+
+	@Override
+	public String toString() {
+		return "AbstractMemStateSnapshot{" +
+				"keySerializer=" + keySerializer +
+				", namespaceSerializer=" + namespaceSerializer +
+				", stateSerializer=" + stateSerializer +
+				", stateDesc=" + stateDesc +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/ByteStreamStateHandle.java
new file mode 100644
index 0000000..d9474dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/ByteStreamStateHandle.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.migration.runtime.state.AbstractCloseableHandle;
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.runtime.state.StreamStateHandle;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+
+@Deprecated
+public final class ByteStreamStateHandle extends AbstractCloseableHandle implements StreamStateHandle {
+
+	private static final long serialVersionUID = -5280226231200217594L;
+	
+	/** the state data */
+	private final byte[] data;
+
+	/**
+	 * Creates a new ByteStreamStateHandle containing the given data.
+	 * 
+	 * @param data The state data.
+	 */
+	public ByteStreamStateHandle(byte[] data) {
+		this.data = data;
+	}
+
+	@Override
+	public InputStream getState(ClassLoader userCodeClassLoader) throws Exception {
+		ensureNotClosed();
+
+		ByteArrayInputStream stream = new ByteArrayInputStream(data);
+		registerCloseable(stream);
+
+		return stream;
+	}
+
+	@Override
+	public void discardState() {}
+
+	@Override
+	public long getStateSize() {
+		return data.length;
+	}
+
+	@Override
+	public <T extends Serializable> StateHandle<T> toSerializableHandle() {
+		SerializedStateHandle<T> serializableHandle = new SerializedStateHandle<T>(data);
+
+		// forward the closed status
+		if (isClosed()) {
+			try {
+				serializableHandle.close();
+			} catch (IOException e) {
+				// should not happen on a fresh handle, but forward anyways
+				throw new RuntimeException(e);
+			}
+		}
+
+		return serializableHandle;
+	}
+
+	public byte[] getData() {
+		return data;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemFoldingState.java
new file mode 100644
index 0000000..d6c63c4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemFoldingState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+@Deprecated
+public class MemFoldingState<K, N, T, ACC> {
+
+	public static class Snapshot<K, N, T, ACC> extends AbstractMemStateSnapshot<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<ACC> stateSerializer,
+			FoldingStateDescriptor<T, ACC> stateDescs, byte[] data) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemListState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemListState.java
new file mode 100644
index 0000000..416a898
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemListState.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.ArrayList;
+
+@Deprecated
+public class MemListState<K, N, V> {
+
+	public static class Snapshot<K, N, V> extends AbstractMemStateSnapshot<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<ArrayList<V>> stateSerializer,
+			ListStateDescriptor<V> stateDescs, byte[] data) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemReducingState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemReducingState.java
new file mode 100644
index 0000000..52d60a9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemReducingState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Heap-backed partitioned {@link ReducingState} that is
+ * snapshotted into a serialized memory copy.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the values in the list state.
+ */
+@Deprecated
+public class MemReducingState<K, N, V> {
+
+	public static class Snapshot<K, N, V> extends AbstractMemStateSnapshot<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<V> stateSerializer,
+			ReducingStateDescriptor<V> stateDescs, byte[] data) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data);
+		}
+	}}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemValueState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemValueState.java
new file mode 100644
index 0000000..ff9bed8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemValueState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Heap-backed key/value state that is snapshotted into a serialized memory copy.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the value.
+ */
+@Deprecated
+public class MemValueState<K, N, V> {
+
+	public static class Snapshot<K, N, V> extends AbstractMemStateSnapshot<K, N, V, ValueState<V>, ValueStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<V> stateSerializer,
+			ValueStateDescriptor<V> stateDescs, byte[] data) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java
new file mode 100644
index 0000000..d3c9b6c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.migration.runtime.state.AbstractCloseableHandle;
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.util.MigrationInstantiationUtil;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A state handle that represents its state in serialized form as bytes.
+ *
+ * @param <T> The type of state represented by this state handle.
+ */
+public class SerializedStateHandle<T extends Serializable> extends AbstractCloseableHandle implements StateHandle<T> {
+	
+	private static final long serialVersionUID = 4145685722538475769L;
+
+	/** The serialized data */
+	private final byte[] serializedData;
+	
+	/**
+	 * Creates a new serialized state handle, eagerly serializing the given state object.
+	 * 
+	 * @param value The state object.
+	 * @throws IOException Thrown, if the serialization fails.
+	 */
+	public SerializedStateHandle(T value) throws IOException {
+		this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
+	}
+
+	/**
+	 * Creates a new serialized state handle, based in the given already serialized data.
+	 * 
+	 * @param serializedData The serialized data.
+	 */
+	public SerializedStateHandle(byte[] serializedData) {
+		this.serializedData = serializedData;
+	}
+	
+	@Override
+	public T getState(ClassLoader classLoader) throws Exception {
+		if (classLoader == null) {
+			throw new NullPointerException();
+		}
+
+		ensureNotClosed();
+		return serializedData == null ? null : MigrationInstantiationUtil.<T>deserializeObject(serializedData, classLoader);
+	}
+
+	/**
+	 * Gets the size of the serialized state.
+	 * @return The size of the serialized state.
+	 */
+	public int getSizeOfSerializedState() {
+		return serializedData.length;
+	}
+
+	/**
+	 * Discarding heap-memory backed state is a no-op, so this method does nothing.
+	 */
+	@Override
+	public void discardState() {}
+
+	@Override
+	public long getStateSize() {
+		return serializedData.length;
+	}
+
+	public byte[] getSerializedData() {
+		return serializedData;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
new file mode 100644
index 0000000..1bebcb6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+@Internal
+@Deprecated
+/**
+ * This class is just a KeyGroupsStateHandle that is tagged as migration, to figure out which restore logic to apply,
+ * e.g. when restoring backend data from a state handle.
+ */
+public class MigrationKeyGroupStateHandle extends KeyGroupsStateHandle {
+
+	private static final long serialVersionUID = -8554427169776881697L;
+
+	/**
+	 * @param groupRangeOffsets range of key-group ids that in the state of this handle
+	 * @param streamStateHandle handle to the actual state of the key-groups
+	 */
+	public MigrationKeyGroupStateHandle(KeyGroupRangeOffsets groupRangeOffsets, StreamStateHandle streamStateHandle) {
+		super(groupRangeOffsets, streamStateHandle);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
new file mode 100644
index 0000000..e7aa788
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.IOException;
+
+@Internal
+@Deprecated
+/**
+ * This class is just a StreamStateHandle that is tagged as migration, to figure out which restore logic to apply, e.g.
+ * when restoring backend data from a state handle.
+ */
+public class MigrationStreamStateHandle implements StreamStateHandle {
+
+	private static final long serialVersionUID = -2332113722532150112L;
+	private final StreamStateHandle delegate;
+
+	public MigrationStreamStateHandle(StreamStateHandle delegate) {
+		this.delegate = delegate;
+	}
+
+	@Override
+	public FSDataInputStream openInputStream() throws IOException {
+		return delegate.openInputStream();
+	}
+
+	@Override
+	public void discardState() throws Exception {
+		delegate.discardState();
+	}
+
+	@Override
+	public long getStateSize() throws IOException {
+		return delegate.getStateSize();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
new file mode 100644
index 0000000..f5af185
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.StateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+
+@Deprecated
+@Internal
+public class StreamTaskState implements Serializable, Closeable {
+
+	private static final long serialVersionUID = 1L;
+	
+	private StateHandle<?> operatorState;
+
+	private StateHandle<Serializable> functionState;
+
+	private HashMap<String, KvStateSnapshot<?, ?, ?, ?>> kvStates;
+
+	// ------------------------------------------------------------------------
+
+	public StateHandle<?> getOperatorState() {
+		return operatorState;
+	}
+
+	public void setOperatorState(StateHandle<?> operatorState) {
+		this.operatorState = operatorState;
+	}
+
+	public StateHandle<Serializable> getFunctionState() {
+		return functionState;
+	}
+
+	public void setFunctionState(StateHandle<Serializable> functionState) {
+		this.functionState = functionState;
+	}
+
+	public HashMap<String, KvStateSnapshot<?, ?, ?, ?>> getKvStates() {
+		return kvStates;
+	}
+
+	public void setKvStates(HashMap<String, KvStateSnapshot<?, ?, ?, ?>> kvStates) {
+		this.kvStates = kvStates;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Checks if this state object actually contains any state, or if all of the state
+	 * fields are null.
+	 * 
+	 * @return True, if all state is null, false if at least one state is not null.
+	 */
+	public boolean isEmpty() {
+		return operatorState == null & functionState == null & kvStates == null;
+	}
+
+
+	@Override
+	public void close() throws IOException {
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
new file mode 100644
index 0000000..8b0dcd3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.StateHandle;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+@Deprecated
+@Internal
+public class StreamTaskStateList implements StateHandle<StreamTaskState[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The states for all operator */
+	private final StreamTaskState[] states;
+
+	public StreamTaskStateList(StreamTaskState[] states) throws Exception {
+		this.states = states;
+	}
+
+	public boolean isEmpty() {
+		for (StreamTaskState state : states) {
+			if (state != null) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Override
+	public StreamTaskState[] getState(ClassLoader userCodeClassLoader) {
+		return states;
+	}
+
+	@Override
+	public void discardState() throws Exception {
+	}
+
+	@Override
+	public long getStateSize() throws Exception {
+		long sumStateSize = 0;
+
+		if (states != null) {
+			for (StreamTaskState state : states) {
+				if (state != null) {
+					StateHandle<?> operatorState = state.getOperatorState();
+					StateHandle<?> functionState = state.getFunctionState();
+					HashMap<String, KvStateSnapshot<?, ?, ?, ?>> kvStates = state.getKvStates();
+
+					if (operatorState != null) {
+						sumStateSize += operatorState.getStateSize();
+					}
+
+					if (functionState != null) {
+						sumStateSize += functionState.getStateSize();
+					}
+
+					if (kvStates != null) {
+						for (KvStateSnapshot<?, ?, ?, ?> kvState : kvStates.values()) {
+							if (kvState != null) {
+								sumStateSize += kvState.getStateSize();
+							}
+						}
+					}
+				}
+			}
+		}
+
+		// State size as sum of all state sizes
+		return sumStateSize;
+	}
+
+	@Override
+	public void close() throws IOException {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 66740c7..172e425 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -46,6 +46,7 @@ public class SavepointLoader {
 	 * @param jobId          The JobID of the job to load the savepoint for.
 	 * @param tasks          Tasks that will possibly be reset
 	 * @param savepointPath  The path of the savepoint to rollback to
+	 * @param userClassLoader The user code classloader
 	 * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
 	 * to any job vertex in tasks.
 	 *
@@ -56,10 +57,11 @@ public class SavepointLoader {
 			JobID jobId,
 			Map<JobVertexID, ExecutionJobVertex> tasks,
 			String savepointPath,
+			ClassLoader userClassLoader,
 			boolean allowNonRestoredState) throws IOException {
 
 		// (1) load the savepoint
-		Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath);
+		Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath, userClassLoader);
 		final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size());
 
 		// (2) validate it (parallelism, etc)

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
index 6a55b33..9d0f1e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
@@ -46,9 +46,10 @@ public interface SavepointSerializer<T extends Savepoint> {
 	 * Deserializes a savepoint from an input stream.
 	 *
 	 * @param dis Input stream to deserialize savepoint from
+	 * @param  userCodeClassLoader the user code class loader
 	 * @return The deserialized savepoint
 	 * @throws IOException Serialization failures are forwarded
 	 */
-	T deserialize(DataInputStream dis) throws IOException;
+	T deserialize(DataInputStream dis, ClassLoader userCodeClassLoader) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index 20b3d89..3155d60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
 import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
@@ -30,10 +31,10 @@ public class SavepointSerializers {
 
 
 	private static final int SAVEPOINT_VERSION_0 = 0;
-	private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(1);
+	private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(2);
 
 	static {
-		SERIALIZERS.put(SAVEPOINT_VERSION_0, null);
+		SERIALIZERS.put(SAVEPOINT_VERSION_0, SavepointV0Serializer.INSTANCE);
 		SERIALIZERS.put(SavepointV1.VERSION, SavepointV1Serializer.INSTANCE);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 4b65418..48cca20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -50,7 +50,7 @@ public class SavepointStore {
 	private static final Logger LOG = LoggerFactory.getLogger(SavepointStore.class);
 
 	/** Magic number for sanity checks against stored savepoints. */
-	private static final int MAGIC_NUMBER = 0x4960672d;
+	public static final int MAGIC_NUMBER = 0x4960672d;
 
 	/** Prefix for savepoint files. */
 	private static final String prefix = "savepoint-";
@@ -125,7 +125,7 @@ public class SavepointStore {
 	 * @return The loaded savepoint
 	 * @throws Exception Failures during load are forwared
 	 */
-	public static Savepoint loadSavepoint(String path) throws IOException {
+	public static Savepoint loadSavepoint(String path, ClassLoader userClassLoader) throws IOException {
 		Preconditions.checkNotNull(path, "Path");
 
 		try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
@@ -135,7 +135,7 @@ public class SavepointStore {
 				int version = dis.readInt();
 
 				SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
-				return serializer.deserialize(dis);
+				return serializer.deserialize(dis, userClassLoader);
 			} else {
 				throw new RuntimeException("Unexpected magic number. This is most likely " +
 						"caused by trying to load a Flink 1.0 savepoint. You cannot load a " +

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 89f1f42..cd3e87f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -94,7 +94,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 	}
 
 	@Override
-	public SavepointV1 deserialize(DataInputStream dis) throws IOException {
+	public SavepointV1 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
 		long checkpointId = dis.readLong();
 
 		// Task states
@@ -124,7 +124,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 		return new SavepointV1(checkpointId, taskStates);
 	}
 
-	public static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
+	private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
 
 		dos.writeLong(subtaskState.getDuration());
 
@@ -163,7 +163,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 
 	}
 
-	public static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
+	private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
 
 		long duration = dis.readLong();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index ae71c7f..2daf896 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -92,7 +92,7 @@ public abstract class AbstractKeyedStateBackend<K>
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange) {
 
-		this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
+		this.kvStateRegistry = kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry);
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
 		this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
new file mode 100644
index 0000000..7492262
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.AbstractMultiFSDataInputStream;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Wrapper class that takes multiple {@link StreamStateHandle} and makes them look like a single one. This is done by
+ * providing a contiguous view on all the streams of the inner handles through a wrapper stream and by summing up all
+ * all the meta data.
+ */
+public class MultiStreamStateHandle implements StreamStateHandle {
+
+	private static final long serialVersionUID = -4588701089489569707L;
+	private final List<StreamStateHandle> stateHandles;
+	private final long stateSize;
+
+	public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) throws IOException {
+		this.stateHandles = Preconditions.checkNotNull(stateHandles);
+		long calculateSize = 0L;
+		for(StreamStateHandle stateHandle : stateHandles) {
+			calculateSize += stateHandle.getStateSize();
+		}
+		this.stateSize = calculateSize;
+	}
+
+	@Override
+	public FSDataInputStream openInputStream() throws IOException {
+		return new MultiFSDataInputStream(stateHandles);
+	}
+
+	@Override
+	public void discardState() throws Exception {
+		StateUtil.bestEffortDiscardAllStateObjects(stateHandles);
+	}
+
+	@Override
+	public long getStateSize() throws IOException {
+		return stateSize;
+	}
+
+	static final class MultiFSDataInputStream extends AbstractMultiFSDataInputStream {
+
+		private final TreeMap<Long, StreamStateHandle> stateHandleMap;
+
+		public MultiFSDataInputStream(List<StreamStateHandle> stateHandles) throws IOException {
+			this.stateHandleMap = new TreeMap<>();
+			this.totalPos = 0L;
+			long calculateSize = 0L;
+			for (StreamStateHandle stateHandle : stateHandles) {
+				stateHandleMap.put(calculateSize, stateHandle);
+				calculateSize += stateHandle.getStateSize();
+			}
+			this.totalAvailable = calculateSize;
+
+			if (totalAvailable > 0L) {
+				StreamStateHandle first = stateHandleMap.firstEntry().getValue();
+				delegate = first.openInputStream();
+			}
+		}
+
+		@Override
+		protected FSDataInputStream getSeekedStreamForOffset(long globalStreamOffset) throws IOException {
+			Map.Entry<Long, StreamStateHandle> handleEntry = stateHandleMap.floorEntry(globalStreamOffset);
+			if (handleEntry != null) {
+				FSDataInputStream stream = handleEntry.getValue().openInputStream();
+				stream.seek(globalStreamOffset - handleEntry.getKey());
+				return stream;
+			}
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 56be46f..aab2ee5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
@@ -28,12 +29,18 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.migration.MigrationUtil;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.filesystem.AbstractFsStateSnapshot;
+import org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.ArrayListSerializer;
@@ -43,6 +50,8 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -103,7 +112,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			LOG.debug("Restoring snapshot from state handles: {}.", restoredState);
 		}
 
-		restorePartitionedState(restoredState);
+		if (MigrationUtil.isOldSavepointKeyedState(restoredState)) {
+			restoreOldSavepointKeyedState(restoredState);
+		} else {
+			restorePartitionedState(restoredState);
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -346,4 +359,132 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return "HeapKeyedStateBackend";
 	}
 
+	/**
+	 * REMOVE
+	 */
+	@Internal
+	@Deprecated
+	public Map<String, StateTable<K, ?, ?>> getStateTables() {
+		return stateTables;
+	}
+
+	@Deprecated
+	private void restoreOldSavepointKeyedState(
+			Collection<KeyGroupsStateHandle> stateHandles) throws IOException, ClassNotFoundException {
+
+		if (stateHandles.isEmpty()) {
+			return;
+		}
+
+		Preconditions.checkState(1 == stateHandles.size(), "Only one element expected here.");
+
+		HashMap<String, KvStateSnapshot<K, ?, ?, ?>> namedStates =
+				InstantiationUtil.deserializeObject(stateHandles.iterator().next().openInputStream(), userCodeClassLoader);
+
+		for (Map.Entry<String, KvStateSnapshot<K, ?, ?, ?>> nameToState : namedStates.entrySet()) {
+
+			KvStateSnapshot<K, ?, ?, ?> genericSnapshot = nameToState.getValue();
+
+			final RestoredState restoredState;
+
+			if (genericSnapshot instanceof AbstractMemStateSnapshot) {
+
+				AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot =
+						(AbstractMemStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue();
+
+				restoredState = restoreHeapState(stateSnapshot);
+
+			} else if (genericSnapshot instanceof AbstractFsStateSnapshot) {
+
+				AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot =
+						(AbstractFsStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue();
+				restoredState = restoreFsState(stateSnapshot);
+			} else {
+				throw new IllegalStateException("Unknown state: " + genericSnapshot);
+			}
+
+			Map rawResultMap = restoredState.getRawResultMap();
+			TypeSerializer<?> namespaceSerializer = restoredState.getNamespaceSerializer();
+			TypeSerializer<?> stateSerializer = restoredState.getStateSerializer();
+
+			if (namespaceSerializer instanceof VoidSerializer) {
+				namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
+			}
+
+			Map nullNameSpaceFix = (Map) rawResultMap.remove(null);
+
+			if (null != nullNameSpaceFix) {
+				rawResultMap.put(VoidNamespace.INSTANCE, nullNameSpaceFix);
+			}
+
+			StateTable<K, ?, ?> stateTable = new StateTable<>(stateSerializer, namespaceSerializer, keyGroupRange);
+			stateTable.getState().set(0, rawResultMap);
+
+			// add named state to the backend
+			getStateTables().put(nameToState.getKey(), stateTable);
+		}
+	}
+
+	private RestoredState restoreHeapState(AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
+		return new RestoredState(
+				stateSnapshot.deserialize(),
+				stateSnapshot.getNamespaceSerializer(),
+				stateSnapshot.getStateSerializer());
+	}
+
+	private RestoredState restoreFsState(AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
+		FileSystem fs = stateSnapshot.getFilePath().getFileSystem();
+		//TODO register closeable to support fast cancelation?
+		try (FSDataInputStream inStream = fs.open(stateSnapshot.getFilePath())) {
+
+			DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
+
+			final int numNamespaces = inView.readInt();
+			HashMap rawResultMap = new HashMap<>(numNamespaces);
+
+			TypeSerializer<K> keySerializer = stateSnapshot.getKeySerializer();
+			TypeSerializer<?> namespaceSerializer = stateSnapshot.getNamespaceSerializer();
+			TypeSerializer<?> stateSerializer = stateSnapshot.getStateSerializer();
+
+			for (int i = 0; i < numNamespaces; i++) {
+				Object namespace = namespaceSerializer.deserialize(inView);
+				final int numKV = inView.readInt();
+				Map<K, Object> namespaceMap = new HashMap<>(numKV);
+				rawResultMap.put(namespace, namespaceMap);
+				for (int j = 0; j < numKV; j++) {
+					K key = keySerializer.deserialize(inView);
+					Object value = stateSerializer.deserialize(inView);
+					namespaceMap.put(key, value);
+				}
+			}
+			return new RestoredState(rawResultMap, namespaceSerializer, stateSerializer);
+		} catch (Exception e) {
+			throw new IOException("Failed to restore state from file system", e);
+		}
+	}
+
+	static final class RestoredState {
+
+		private final Map rawResultMap;
+		private final TypeSerializer<?> namespaceSerializer;
+		private final TypeSerializer<?> stateSerializer ;
+
+		public RestoredState(Map rawResultMap, TypeSerializer<?> namespaceSerializer, TypeSerializer<?> stateSerializer) {
+			this.rawResultMap = rawResultMap;
+			this.namespaceSerializer = namespaceSerializer;
+			this.stateSerializer = stateSerializer;
+		}
+
+		public Map getRawResultMap() {
+			return rawResultMap;
+		}
+
+		public TypeSerializer<?> getNamespaceSerializer() {
+			return namespaceSerializer;
+		}
+
+		public TypeSerializer<?> getStateSerializer() {
+			return stateSerializer;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0ffca55..982efe8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -816,8 +816,10 @@ class JobManager(
       future {
         try {
           log.info(s"Disposing savepoint at '$savepointPath'.")
-
-          val savepoint = SavepointStore.loadSavepoint(savepointPath)
+          //TODO user code class loader ?
+          val savepoint = SavepointStore.loadSavepoint(
+            savepointPath,
+            Thread.currentThread().getContextClassLoader)
 
           log.debug(s"$savepoint")
 
@@ -1199,7 +1201,7 @@ class JobManager(
               "Cannot set up the user code libraries: " + t.getMessage, t)
         }
 
-        val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
+        var userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
         if (userCodeLoader == null) {
           throw new JobSubmissionException(jobId,
             "The user code class loader could not be initialized.")
@@ -1316,9 +1318,13 @@ class JobManager(
                 log.info(s"Starting job from savepoint '$savepointPath'" +
                   (if (allowNonRestored) " (allowing non restored state)" else "") + ".")
 
-                // load the savepoint as a checkpoint into the system
-                val savepoint: CompletedCheckpoint = SavepointLoader.loadAndValidateSavepoint(
-                  jobId, executionGraph.getAllVertices, savepointPath, allowNonRestored)
+                  // load the savepoint as a checkpoint into the system
+                  val savepoint: CompletedCheckpoint = SavepointLoader.loadAndValidateSavepoint(
+                    jobId,
+                    executionGraph.getAllVertices,
+                    savepointPath,
+                    executionGraph.getUserClassLoader,
+                    allowNonRestored)
 
                 executionGraph.getCheckpointCoordinator.getCheckpointStore
                   .addCheckpoint(savepoint)