You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/14 16:58:01 UTC

[2/5] flink git commit: [FLINK-5051] Add Serde Proxies for Serializers and State Backend Data

[FLINK-5051] Add Serde Proxies for Serializers and State Backend Data


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21d1d8b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21d1d8b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21d1d8b4

Branch: refs/heads/master
Commit: 21d1d8b49337e734ce3defb5f1b9344f748cb49e
Parents: e95fe56
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Nov 24 17:29:24 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 14 17:50:51 2016 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 108 +++++----
 .../common/state/FoldingStateDescriptor.java    |   5 +
 .../api/common/state/ListStateDescriptor.java   |   5 +
 .../common/state/ReducingStateDescriptor.java   |   5 +
 .../flink/api/common/state/StateDescriptor.java |   7 +
 .../api/common/state/ValueStateDescriptor.java  |   5 +
 .../api/common/typeutils/TypeSerializer.java    |  10 +-
 .../TypeSerializerSerializationProxy.java       | 231 +++++++++++++++++++
 .../flink/core/io/VersionMismatchException.java |  44 ++++
 .../core/io/VersionedIOReadableWritable.java    |  70 ++++++
 .../memory/ByteArrayInputStreamWithPos.java     |   4 +
 .../memory/ByteArrayOutputStreamWithPos.java    |   4 +
 .../TypeSerializerSerializationProxyTest.java   |  94 ++++++++
 .../flink/core/io/VersionedIOWriteableTest.java | 148 ++++++++++++
 .../state/DefaultOperatorStateBackend.java      | 195 +++++++++++-----
 .../flink/runtime/state/JavaSerializer.java     |  16 +-
 .../state/KeyedBackendSerializationProxy.java   | 215 +++++++++++++++++
 .../OperatorBackendSerializationProxy.java      | 140 +++++++++++
 .../state/RegisteredBackendStateMetaInfo.java   | 132 +++++++++++
 .../state/heap/HeapKeyedStateBackend.java       | 125 ++++++----
 .../flink/runtime/state/heap/StateTable.java    |  26 ++-
 .../runtime/query/QueryableStateClientTest.java |  12 +-
 .../runtime/state/OperatorStateBackendTest.java |   3 +-
 .../runtime/state/SerializationProxiesTest.java |  99 ++++++++
 .../runtime/state/StateBackendTestBase.java     |   6 +-
 25 files changed, 1534 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 5fef5e5..1c0a4b7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -50,6 +50,8 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.InstantiationUtil;
@@ -123,7 +125,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * Information about the k/v states as we create them. This is used to retrieve the
 	 * column family that is used for a state and also for sanity checks when restoring.
 	 */
-	private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>> kvStateInformation;
+	private Map<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> kvStateInformation;
 
 	/** Number of bytes required to prefix the key groups. */
 	private final int keyGroupPrefixBytes;
@@ -237,7 +239,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			// and access it in a synchronized block that locks on #dbDisposeLock.
 			if (db != null) {
 
-				for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> column : kvStateInformation.values()) {
+				for (Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> column :
+						kvStateInformation.values()) {
+
 					column.f0.close();
 				}
 
@@ -492,18 +496,24 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		private void writeKVStateMetaData() throws IOException, InterruptedException {
-			//write number of k/v states
-			outputView.writeInt(stateBackend.kvStateInformation.size());
+
+			List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
+					new ArrayList<>(stateBackend.kvStateInformation.size());
 
 			int kvStateId = 0;
-			//iterate all column families, where each column family holds one k/v state, to write the metadata
-			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>> column : stateBackend.kvStateInformation.entrySet()) {
+			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> column :
+					stateBackend.kvStateInformation.entrySet()) {
+
+				RegisteredBackendStateMetaInfo<?, ?> metaInfo = column.getValue().f1;
 
-				//be cooperative and check for interruption from time to time in the hot loop
-				checkInterrupted();
+				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
+						new KeyedBackendSerializationProxy.StateMetaInfo<>(
+								metaInfo.getStateType(),
+								metaInfo.getName(),
+								metaInfo.getNamespaceSerializer(),
+								metaInfo.getStateSerializer());
 
-				//write StateDescriptor for this k/v state
-				InstantiationUtil.serializeObject(outStream, column.getValue().f1);
+				metaInfoList.add(metaInfoProxy);
 
 				//retrieve iterator for this k/v states
 				readOptions = new ReadOptions();
@@ -512,6 +522,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				kvStateIterators.add(new Tuple2<>(iterator, kvStateId));
 				++kvStateId;
 			}
+
+			KeyedBackendSerializationProxy serializationProxy =
+					new KeyedBackendSerializationProxy(stateBackend.getKeySerializer(), metaInfoList);
+
+			serializationProxy.write(outputView);
 		}
 
 		private void writeKVStateData() throws IOException, InterruptedException {
@@ -691,30 +706,35 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @throws RocksDBException
 		 */
 		private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException {
-			//read number of k/v states
-			int numColumns = currentStateHandleInView.readInt();
 
-			//those two lists are aligned and should later have the same size!
-			currentStateHandleKVStateColumnFamilies = new ArrayList<>(numColumns);
+			KeyedBackendSerializationProxy serializationProxy =
+					new KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader);
 
-			//restore the empty columns for the k/v states through the metadata
-			for (int i = 0; i < numColumns; i++) {
+			serializationProxy.read(currentStateHandleInView);
 
-				StateDescriptor<?, ?> stateDescriptor = InstantiationUtil.deserializeObject(
-						currentStateHandleInStream,
-						rocksDBKeyedStateBackend.userCodeClassLoader);
+			List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList =
+					serializationProxy.getNamedStateSerializationProxies();
 
-				Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> columnFamily = rocksDBKeyedStateBackend.
-						kvStateInformation.get(stateDescriptor.getName());
+			currentStateHandleKVStateColumnFamilies = new ArrayList<>(metaInfoProxyList.size());
+
+			for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy : metaInfoProxyList) {
+				Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> columnFamily =
+						rocksDBKeyedStateBackend.kvStateInformation.get(metaInfoProxy.getStateName());
 
 				if (null == columnFamily) {
 					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
-							stateDescriptor.getName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
+							metaInfoProxy.getStateName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
+
+					RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo =
+							new RegisteredBackendStateMetaInfo<>(metaInfoProxy);
 
-					columnFamily = new Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>(
-							rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor), stateDescriptor);
+					columnFamily = new Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>(
+							rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor),
+							stateMetaInfo);
 
-					rocksDBKeyedStateBackend.kvStateInformation.put(stateDescriptor.getName(), columnFamily);
+					rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), columnFamily);
+				} else {
+					//TODO we could check here for incompatible serializer versions between previous tasks
 				}
 
 				currentStateHandleKVStateColumnFamilies.add(columnFamily.f0);
@@ -776,15 +796,24 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
 	 * that we checkpointed, i.e. is already in the map of column families.
 	 */
-	protected ColumnFamilyHandle getColumnFamily(StateDescriptor<?, ?> descriptor) {
+	@SuppressWarnings("rawtypes, unchecked")
+	protected <N, S> ColumnFamilyHandle getColumnFamily(StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) {
+
+		Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo =
+				kvStateInformation.get(descriptor.getName());
 
-		Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName());
+		RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>(
+				descriptor.getType(),
+				descriptor.getName(),
+				namespaceSerializer,
+				descriptor.getSerializer());
 
 		if (stateInfo != null) {
-			if (!stateInfo.f1.equals(descriptor)) {
-				throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 +
-						" trying access with " + descriptor);
+			if (!newMetaInfo.isCompatibleWith(stateInfo.f1)) {
+				throw new RuntimeException("Trying to access state using wrong meta info, was " + stateInfo.f1 +
+						" trying access with " + newMetaInfo);
 			}
+			stateInfo.f1 = newMetaInfo;
 			return stateInfo.f0;
 		}
 
@@ -793,9 +822,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		try {
 			ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
-			Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> tuple =
-					new Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>(columnFamily, descriptor);
-			kvStateInformation.put(descriptor.getName(), tuple);
+			Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<N, S>> tuple =
+					new Tuple2<>(columnFamily, newMetaInfo);
+			Map rawAccess = kvStateInformation;
+			rawAccess.put(descriptor.getName(), tuple);
 			return columnFamily;
 		} catch (RocksDBException e) {
 			throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
@@ -806,7 +836,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
 			ValueStateDescriptor<T> stateDesc) throws Exception {
 
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
 
 		return new RocksDBValueState<>(columnFamily, namespaceSerializer,  stateDesc, this);
 	}
@@ -815,7 +845,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
 			ListStateDescriptor<T> stateDesc) throws Exception {
 
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
 
 		return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this);
 	}
@@ -824,7 +854,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
 			ReducingStateDescriptor<T> stateDesc) throws Exception {
 
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
 
 		return new RocksDBReducingState<>(columnFamily, namespaceSerializer,  stateDesc, this);
 	}
@@ -833,7 +863,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
 
 		return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
 	}
@@ -1116,7 +1146,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			columnFamilyMapping.put(mappingByte, stateDescriptor);
 
 			// this will fill in the k/v state information
-			getColumnFamily(stateDescriptor);
+			getColumnFamily(stateDescriptor, null);
 		}
 
 		// try and read until EOF
@@ -1124,7 +1154,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			// the EOFException will get us out of this...
 			while (true) {
 				byte mappingByte = inputView.readByte();
-				ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte));
+				ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte), null);
 				byte[] keyAndNamespace = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
 
 				ByteArrayInputStreamWithPos bis = new ByteArrayInputStreamWithPos(keyAndNamespace);

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 32fa9f3..143945e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -138,4 +138,9 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState
 				", foldFunction=" + foldFunction +
 				'}';
 	}
+
+	@Override
+	public Type getType() {
+		return Type.FOLDING;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 077109c..6861a07 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -101,4 +101,9 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
 				"serializer=" + serializer +
 				'}';
 	}
+
+	@Override
+	public Type getType() {
+		return Type.LIST;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index 8d79da4..a1d4225 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -126,4 +126,9 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
 				", reduceFunction=" + reduceFunction +
 				'}';
 	}
+
+	@Override
+	public Type getType() {
+		return Type.REDUCING;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 483c954..de3cd4e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -48,6 +48,11 @@ import static java.util.Objects.requireNonNull;
  */
 @PublicEvolving
 public abstract class StateDescriptor<S extends State, T> implements Serializable {
+
+	public enum Type {
+		VALUE, LIST, REDUCING, FOLDING, @Deprecated UNKNOWN
+	}
+
 	private static final long serialVersionUID = 1L;
 
 	/** Name that uniquely identifies state created from this StateDescriptor. */
@@ -267,6 +272,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 				'}';
 	}
 
+	public abstract Type getType();
+
 	// ------------------------------------------------------------------------
 	//  Serialization
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index 10bcd58..7db9116 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -110,4 +110,9 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
 				", serializer=" + serializer +
 				'}';
 	}
+
+	@Override
+	public Type getType() {
+		return Type.VALUE;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 5e81db7..ac7fbc8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import java.io.IOException;
-import java.io.Serializable;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+import java.io.IOException;
+import java.io.Serializable;
+
 /**
  * This interface describes the methods that are required for a data type to be handled by the Flink
  * runtime. Specifically, this interface contains the serialization and copying methods.
@@ -160,4 +160,8 @@ public abstract class TypeSerializer<T> implements Serializable {
 	public abstract boolean canEqual(Object obj);
 
 	public abstract int hashCode();
+
+	public boolean isCompatibleWith(TypeSerializer<?> other) {
+		return equals(other);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
new file mode 100644
index 0000000..06ad8bf
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class TypeSerializerSerializationProxy<T> extends VersionedIOReadableWritable {
+
+	public static final int VERSION = 1;
+	private static final Logger LOG = LoggerFactory.getLogger(TypeSerializerSerializationProxy.class);
+
+	private ClassLoader userClassLoader;
+
+	private TypeSerializer<T> typeSerializer;
+
+	private boolean ignoreClassNotFound;
+
+	public TypeSerializerSerializationProxy(ClassLoader userClassLoader, boolean ignoreClassNotFound) {
+		this.userClassLoader = userClassLoader;
+		this.ignoreClassNotFound = ignoreClassNotFound;
+	}
+
+	public TypeSerializerSerializationProxy(ClassLoader userClassLoader) {
+		this(userClassLoader, false);
+	}
+
+	public TypeSerializerSerializationProxy(TypeSerializer<T> typeSerializer) {
+		this.typeSerializer = Preconditions.checkNotNull(typeSerializer);
+		this.ignoreClassNotFound = false;
+	}
+
+	public TypeSerializer<T> getTypeSerializer() {
+		return typeSerializer;
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		super.write(out);
+
+		if (typeSerializer instanceof ClassNotFoundDummyTypeSerializer) {
+			ClassNotFoundDummyTypeSerializer<T> dummyTypeSerializer =
+					(ClassNotFoundDummyTypeSerializer<T>) this.typeSerializer;
+
+			byte[] serializerBytes = dummyTypeSerializer.getActualBytes();
+			out.write(serializerBytes.length);
+			out.write(serializerBytes);
+		} else {
+			// write in a way that allows the stream to recover from exceptions
+			try (ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos()) {
+				InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
+				out.writeInt(streamWithPos.getPosition());
+				out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition());
+			}
+		}
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		super.read(in);
+
+		// read in a way that allows the stream to recover from exceptions
+		int serializerBytes = in.readInt();
+		byte[] buffer = new byte[serializerBytes];
+		in.read(buffer);
+		try {
+			typeSerializer = InstantiationUtil.deserializeObject(buffer, userClassLoader);
+		} catch (ClassNotFoundException e) {
+			if (ignoreClassNotFound) {
+				// we create a dummy so that all the information is not lost when we get a new checkpoint before receiving
+				// a proper typeserializer from the user
+				typeSerializer =
+						new ClassNotFoundDummyTypeSerializer<>(buffer);
+				LOG.warn("Could not find requested TypeSerializer class in classpath. Created dummy.", e);
+			} else {
+				throw new IOException("Missing class for type serializer.", e);
+			}
+		}
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		TypeSerializerSerializationProxy<?> that = (TypeSerializerSerializationProxy<?>) o;
+
+		return getTypeSerializer() != null ? getTypeSerializer().equals(that.getTypeSerializer()) : that.getTypeSerializer() == null;
+	}
+
+	@Override
+	public int hashCode() {
+		return getTypeSerializer() != null ? getTypeSerializer().hashCode() : 0;
+	}
+
+	public boolean isIgnoreClassNotFound() {
+		return ignoreClassNotFound;
+	}
+
+	public void setIgnoreClassNotFound(boolean ignoreClassNotFound) {
+		this.ignoreClassNotFound = ignoreClassNotFound;
+	}
+
+	/**
+	 * Dummy TypeSerializer to avoid that data is lost when checkpointing again a serializer for which we encountered
+	 * a {@link ClassNotFoundException}.
+	 */
+	static final class ClassNotFoundDummyTypeSerializer<T> extends TypeSerializer<T> {
+
+		private static final long serialVersionUID = 2526330533671642711L;
+		private final byte[] actualBytes;
+
+		public ClassNotFoundDummyTypeSerializer(byte[] actualBytes) {
+			this.actualBytes = Preconditions.checkNotNull(actualBytes);
+		}
+
+		public byte[] getActualBytes() {
+			return actualBytes;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+		}
+
+		@Override
+		public TypeSerializer<T> duplicate() {
+			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+		}
+
+		@Override
+		public T createInstance() {
+			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+		}
+
+		@Override
+		public T copy(T from) {
+			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+		}
+
+		@Override
+		public T copy(T from, T reuse) {
+			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+		}
+
+		@Override
+		public int getLength() {
+			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+		}
+
+		@Override
+		public void serialize(T record, DataOutputView target) throws IOException {
+			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+		}
+
+		@Override
+		public T deserialize(DataInputView source) throws IOException {
+			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+		}
+
+		@Override
+		public T deserialize(T reuse, DataInputView source) throws IOException {
+			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return false;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			ClassNotFoundDummyTypeSerializer<?> that = (ClassNotFoundDummyTypeSerializer<?>) o;
+
+			return Arrays.equals(getActualBytes(), that.getActualBytes());
+		}
+
+		@Override
+		public int hashCode() {
+			return Arrays.hashCode(getActualBytes());
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/core/io/VersionMismatchException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/VersionMismatchException.java b/flink-core/src/main/java/org/apache/flink/core/io/VersionMismatchException.java
new file mode 100644
index 0000000..3ff88e9
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/VersionMismatchException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import java.io.IOException;
+
+/**
+ * This exception signals that incompatible versions have been found during serialization.
+ */
+public class VersionMismatchException extends IOException {
+
+	private static final long serialVersionUID = 7024258967585372438L;
+
+	public VersionMismatchException() {
+	}
+
+	public VersionMismatchException(String message) {
+		super(message);
+	}
+
+	public VersionMismatchException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public VersionMismatchException(Throwable cause) {
+		super(cause);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java b/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
new file mode 100644
index 0000000..94c2722
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * This is the abstract base class for {@link IOReadableWritable} which allows to differentiate between serialization
+ * versions. Concrete subclasses should typically override the {@link #write(DataOutputView)} and
+ * {@link #read(DataInputView)}, thereby calling super to ensure version checking.
+ */
+public abstract class VersionedIOReadableWritable implements IOReadableWritable, Versioned {
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(getVersion());
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		int foundVersion = in.readInt();
+		resolveVersionRead(foundVersion);
+	}
+
+	/**
+	 * This method is a hook to react on the version tag that we find during read. This can also be used to initialize
+	 * further read logic w.r.t. the version at hand.
+	 * Default implementation of this method just checks the compatibility of a version number against the own version.
+	 *
+	 * @param foundVersion the version found from reading the input stream
+	 * @throws VersionMismatchException thrown when serialization versions mismatch
+	 */
+	protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
+		if (!isCompatibleVersion(foundVersion)) {
+			long expectedVersion = getVersion();
+			throw new VersionMismatchException(
+					"Incompatible version: found " + foundVersion + ", required " + expectedVersion);
+		}
+	}
+
+	/**
+	 * Checks for compatibility between this and the found version. Subclasses can override this methods in case of
+	 * intended backwards backwards compatibility.
+	 *
+	 * @param version version number to compare against.
+	 * @return true, iff this is compatible to the passed version.
+	 */
+	public boolean isCompatibleVersion(int version) {
+		return getVersion() == version;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
index c25f491..46b82c7 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
@@ -115,4 +115,8 @@ public class ByteArrayInputStreamWithPos extends InputStream {
 	public int getPosition() {
 		return position;
 	}
+
+	public void setPos(int pos) {
+		this.position = pos;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index df5b34a..ebaf1b9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -114,4 +114,8 @@ public class ByteArrayOutputStreamWithPos extends OutputStream {
 	@Override
 	public void close() throws IOException {
 	}
+
+	public byte[] getBuf() {
+		return buffer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
new file mode 100644
index 0000000..982e7ff
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.fail;
+
+public class TypeSerializerSerializationProxyTest {
+
+	@Test
+	public void testStateSerializerSerializationProxy() throws Exception {
+
+		TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+		TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer);
+
+		byte[] serialized;
+		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+			proxy.write(new DataOutputViewStreamWrapper(out));
+			serialized = out.toByteArray();
+		}
+
+		proxy = new TypeSerializerSerializationProxy<>(Thread.currentThread().getContextClassLoader());
+
+		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			proxy.read(new DataInputViewStreamWrapper(in));
+		}
+
+		Assert.assertEquals(serializer, proxy.getTypeSerializer());
+	}
+
+	@Test
+	public void testStateSerializerSerializationProxyClassNotFound() throws Exception {
+
+		TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+		TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer);
+
+		byte[] serialized;
+		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+			proxy.write(new DataOutputViewStreamWrapper(out));
+			serialized = out.toByteArray();
+		}
+
+		proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null));
+
+		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			proxy.read(new DataInputViewStreamWrapper(in));
+			fail("ClassNotFoundException expected, leading to IOException");
+		} catch (IOException expected) {
+
+		}
+
+		proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null), true);
+
+		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			proxy.read(new DataInputViewStreamWrapper(in));
+		}
+
+		Assert.assertTrue(proxy.getTypeSerializer() instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer);
+
+		Assert.assertArrayEquals(
+				InstantiationUtil.serializeObject(serializer),
+				((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) proxy.getTypeSerializer()).getActualBytes());
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java b/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
new file mode 100644
index 0000000..b7b6d6f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class VersionedIOWriteableTest {
+
+	@Test
+	public void testReadSameVersion() throws Exception {
+
+		String payload = "test";
+
+		TestWriteable testWriteable = new TestWriteable(1, payload);
+		byte[] serialized;
+		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+			testWriteable.write(new DataOutputViewStreamWrapper(out));
+			serialized = out.toByteArray();
+		}
+
+		testWriteable = new TestWriteable(1);
+		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			testWriteable.read(new DataInputViewStreamWrapper(in));
+		}
+
+		Assert.assertEquals(payload, testWriteable.getData());
+	}
+
+	@Test
+	public void testReadCompatibleVersion() throws Exception {
+
+		String payload = "test";
+
+		TestWriteable testWriteable = new TestWriteable(1, payload);
+		byte[] serialized;
+		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+			testWriteable.write(new DataOutputViewStreamWrapper(out));
+			serialized = out.toByteArray();
+		}
+
+		testWriteable = new TestWriteable(2) {
+			@Override
+			public boolean isCompatibleVersion(int version) {
+				return getVersion() >= version;
+			}
+		};
+		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			testWriteable.read(new DataInputViewStreamWrapper(in));
+		}
+
+		Assert.assertEquals(payload, testWriteable.getData());
+	}
+
+	@Test
+	public void testReadMismatchVersion() throws Exception {
+
+		String payload = "test";
+
+		TestWriteable testWriteable = new TestWriteable(1, payload);
+		byte[] serialized;
+		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+			testWriteable.write(new DataOutputViewStreamWrapper(out));
+			serialized = out.toByteArray();
+		}
+
+		testWriteable = new TestWriteable(2);
+		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			testWriteable.read(new DataInputViewStreamWrapper(in));
+			Assert.fail("Version mismatch expected.");
+		} catch (VersionMismatchException ignored) {
+
+		}
+
+		Assert.assertEquals(null, testWriteable.getData());
+	}
+
+	static class TestWriteable extends VersionedIOReadableWritable {
+
+		private final int version;
+		private String data;
+
+		public TestWriteable(int version) {
+			this(version, null);
+		}
+
+		public TestWriteable(int version, String data) {
+			this.version = version;
+			this.data = data;
+		}
+
+		@Override
+		public int getVersion() {
+			return version;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+			out.writeUTF(data);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			super.read(in);
+			this.data = in.readUTF();
+		}
+
+		@Override
+		protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
+			super.resolveVersionRead(foundVersion);
+		}
+
+		@Override
+		public boolean isCompatibleVersion(int version) {
+			return super.isCompatibleVersion(version);
+		}
+
+		public String getData() {
+			return data;
+		}
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 5b47362..d7a10d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -52,6 +53,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 	private final Collection<OperatorStateHandle> restoreSnapshots;
 	private final CloseableRegistry closeStreamOnCancelRegistry;
 	private final JavaSerializer<Serializable> javaSerializer;
+	private final ClassLoader userClassloader;
 
 	/**
 	 * Restores a OperatorStateStore (lazily) using the provided snapshots.
@@ -60,21 +62,23 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 	 */
 	public DefaultOperatorStateBackend(
 			ClassLoader userClassLoader,
-			Collection<OperatorStateHandle> restoreSnapshots) {
+			Collection<OperatorStateHandle> restoreSnapshots) throws IOException {
 
-		Preconditions.checkNotNull(userClassLoader);
-		this.javaSerializer = new JavaSerializer<>(userClassLoader);
-		this.restoreSnapshots = restoreSnapshots;
+		this.userClassloader = Preconditions.checkNotNull(userClassLoader);
+		this.javaSerializer = new JavaSerializer<>();
 		this.registeredStates = new HashMap<>();
 		this.closeStreamOnCancelRegistry = new CloseableRegistry();
+		this.restoreSnapshots = restoreSnapshots;
+		restoreState();
 	}
 
 	/**
 	 * Creates an empty OperatorStateStore.
 	 */
-	public DefaultOperatorStateBackend(ClassLoader userClassLoader) {
+	public DefaultOperatorStateBackend(ClassLoader userClassLoader) throws IOException {
 		this(userClassLoader, null);
 	}
+
 	@SuppressWarnings("unchecked")
 	@Override
 	public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
@@ -82,8 +86,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 	}
 	
 	@Override
-	public <S> ListState<S> getOperatorState(
-			ListStateDescriptor<S> stateDescriptor) throws IOException {
+	public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws IOException {
 
 		Preconditions.checkNotNull(stateDescriptor);
 
@@ -95,41 +98,93 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 		if (null == partitionableListState) {
 
-			partitionableListState = new PartitionableListState<>(partitionStateSerializer);
-
+			partitionableListState = new PartitionableListState<>(name, partitionStateSerializer);
 			registeredStates.put(name, partitionableListState);
+		} else {
+			Preconditions.checkState(
+					partitionableListState.getPartitionStateSerializer().
+							isCompatibleWith(stateDescriptor.getSerializer()),
+					"Incompatible type serializers. Provided: " + stateDescriptor.getSerializer() +
+							", found: " + partitionableListState.getPartitionStateSerializer());
+		}
+
+		return partitionableListState;
+	}
+
+	private void restoreState() throws IOException {
+
+		if (null == restoreSnapshots) {
+			return;
+		}
 
-			// Try to restore previous state if state handles to snapshots are provided
-			if (restoreSnapshots != null) {
-				for (OperatorStateHandle stateHandle : restoreSnapshots) {
-					//TODO we coud be even more gc friendly be removing handles from the collections one the map is empty
-					// search and remove to be gc friendly
-					long[] offsets = stateHandle.getStateNameToPartitionOffsets().remove(name);
+		for (OperatorStateHandle stateHandle : restoreSnapshots) {
 
-					if (offsets != null) {
+			if (stateHandle == null) {
+				continue;
+			}
+
+			FSDataInputStream in = stateHandle.openInputStream();
+			closeStreamOnCancelRegistry.registerClosable(in);
+
+			ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader();
 
-						FSDataInputStream in = stateHandle.openInputStream();
-						try {
-							closeStreamOnCancelRegistry.registerClosable(in);
+			try {
+				Thread.currentThread().setContextClassLoader(userClassloader);
+				OperatorBackendSerializationProxy backendSerializationProxy =
+						new OperatorBackendSerializationProxy(userClassloader);
 
-							DataInputView div = new DataInputViewStreamWrapper(in);
+				backendSerializationProxy.read(new DataInputViewStreamWrapper(in));
 
-							for (int i = 0; i < offsets.length; ++i) {
+				List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
+						backendSerializationProxy.getNamedStateSerializationProxies();
 
-								in.seek(offsets[i]);
-								S partitionState = partitionStateSerializer.deserialize(div);
-								partitionableListState.add(partitionState);
-							}
-						} finally {
-							closeStreamOnCancelRegistry.unregisterClosable(in);
-							in.close();
-						}
+				// Recreate all PartitionableListStates from the meta info
+				for (OperatorBackendSerializationProxy.StateMetaInfo<?> stateMetaInfo : metaInfoList) {
+					PartitionableListState<?> listState = registeredStates.get(stateMetaInfo.getName());
+
+					if (null == listState) {
+						listState = new PartitionableListState<>(
+								stateMetaInfo.getName(),
+								stateMetaInfo.getStateSerializer());
+
+						registeredStates.put(listState.getName(), listState);
+					} else {
+						Preconditions.checkState(listState.getPartitionStateSerializer().isCompatibleWith(
+								stateMetaInfo.getStateSerializer()), "Incompatible state serializers found: " +
+								listState.getPartitionStateSerializer() + " is not compatible with " +
+								stateMetaInfo.getStateSerializer());
 					}
 				}
+
+				// Restore all the state in PartitionableListStates
+				for (Map.Entry<String, long[]> nameToOffsets : stateHandle.getStateNameToPartitionOffsets().entrySet()) {
+					PartitionableListState<?> stateListForName = registeredStates.get(nameToOffsets.getKey());
+
+					Preconditions.checkState(null != stateListForName, "Found state without " +
+							"corresponding meta info: " + nameToOffsets.getKey());
+
+					deserializeStateValues(stateListForName, in, nameToOffsets.getValue());
+				}
+
+			} finally {
+				Thread.currentThread().setContextClassLoader(restoreClassLoader);
+				closeStreamOnCancelRegistry.unregisterClosable(in);
+				IOUtils.closeQuietly(in);
 			}
 		}
+	}
 
-		return partitionableListState;
+	private static <S> void deserializeStateValues(
+			PartitionableListState<S> stateListForName,
+			FSDataInputStream in,
+			long[] offsets) throws IOException {
+
+		DataInputView div = new DataInputViewStreamWrapper(in);
+		TypeSerializer<S> serializer = stateListForName.getPartitionStateSerializer();
+		for (long offset : offsets) {
+			in.seek(offset);
+			stateListForName.add(serializer.deserialize(div));
+		}
 	}
 	
 	@Override
@@ -140,6 +195,18 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			return new DoneFuture<>(null);
 		}
 
+		List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
+				new ArrayList<>(registeredStates.size());
+
+		for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) {
+			PartitionableListState<?> state = entry.getValue();
+			OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
+					new OperatorBackendSerializationProxy.StateMetaInfo<>(
+							state.getName(),
+							state.getPartitionStateSerializer());
+			metaInfoList.add(metaInfo);
+		}
+
 		Map<String, long[]> writtenStatesMetaData = new HashMap<>(registeredStates.size());
 
 		CheckpointStreamFactory.CheckpointStateOutputStream out = streamFactory.
@@ -150,6 +217,11 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 			DataOutputView dov = new DataOutputViewStreamWrapper(out);
 
+			OperatorBackendSerializationProxy backendSerializationProxy =
+					new OperatorBackendSerializationProxy(metaInfoList);
+
+			backendSerializationProxy.write(dov);
+
 			dov.writeInt(registeredStates.size());
 			for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) {
 
@@ -171,29 +243,26 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		registeredStates.clear();
 	}
 
+	@Override
+	public Set<String> getRegisteredStateNames() {
+		return registeredStates.keySet();
+	}
+
+	@Override
+	public void close() throws IOException {
+		closeStreamOnCancelRegistry.close();
+	}
+
 	static final class PartitionableListState<S> implements ListState<S> {
 
 		private final List<S> internalList;
+		private final String name;
 		private final TypeSerializer<S> partitionStateSerializer;
 
-		public PartitionableListState(TypeSerializer<S> partitionStateSerializer) {
+		public PartitionableListState(String name, TypeSerializer<S> partitionStateSerializer) {
 			this.internalList = new ArrayList<>();
 			this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
-		}
-
-		@Override
-		public void clear() {
-			internalList.clear();
-		}
-
-		@Override
-		public Iterable<S> get() {
-			return internalList;
-		}
-
-		@Override
-		public void add(S value) {
-			internalList.add(value);
+			this.name = Preconditions.checkNotNull(name);
 		}
 
 		public long[] write(FSDataOutputStream out) throws IOException {
@@ -215,6 +284,29 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			return internalList;
 		}
 
+		public String getName() {
+			return name;
+		}
+
+		public TypeSerializer<S> getPartitionStateSerializer() {
+			return partitionStateSerializer;
+		}
+
+		@Override
+		public void clear() {
+			internalList.clear();
+		}
+
+		@Override
+		public Iterable<S> get() {
+			return internalList;
+		}
+
+		@Override
+		public void add(S value) {
+			internalList.add(value);
+		}
+
 		@Override
 		public String toString() {
 			return "PartitionableListState{" +
@@ -222,16 +314,5 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 					'}';
 		}
 	}
-
-	@Override
-	public Set<String> getRegisteredStateNames() {
-		return registeredStates.keySet();
-	}
-
-	@Override
-	public void close() throws IOException {
-		closeStreamOnCancelRegistry.close();
-	}
-
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
index 2eb9595..512baf6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -34,15 +33,7 @@ import java.io.Serializable;
 @Internal
 final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
 
-	private final ClassLoader userClassLoader;
-
-	public JavaSerializer() {
-		this(Thread.currentThread().getContextClassLoader());
-	}
-
-	public JavaSerializer(ClassLoader userClassLoader) {
-		this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
-	}
+	private static final long serialVersionUID = 5067491650263321234L;
 
 	@Override
 	public boolean isImmutableType() {
@@ -87,7 +78,8 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
 	@Override
 	public T deserialize(DataInputView source) throws IOException {
 		try {
-			return InstantiationUtil.deserializeObject(new DataInputViewStream(source), userClassLoader);
+			return InstantiationUtil.deserializeObject(
+					new DataInputViewStream(source), Thread.currentThread().getContextClassLoader());
 		} catch (ClassNotFoundException e) {
 			throw new IOException("Could not deserialize object.", e);
 		}
@@ -107,7 +99,7 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
 
 	@Override
 	public boolean equals(Object obj) {
-		return obj instanceof JavaSerializer && userClassLoader.equals(((JavaSerializer<T>) obj).userClassLoader);
+		return obj instanceof JavaSerializer;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
new file mode 100644
index 0000000..dbee6cb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serialization proxy for all meta data in keyed state backends. In the future we might also migrate the actual state
+ * serialization logic here.
+ */
+public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
+
+	private static final int VERSION = 1;
+
+	private TypeSerializerSerializationProxy<?> keySerializerProxy;
+	private List<StateMetaInfo<?, ?>> namedStateSerializationProxies;
+
+	private ClassLoader userCodeClassLoader;
+
+	public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
+		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+	}
+
+	public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, List<StateMetaInfo<?, ?>> namedStateSerializationProxies) {
+		this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
+		this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
+		Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
+	}
+
+	public List<StateMetaInfo<?, ?>> getNamedStateSerializationProxies() {
+		return namedStateSerializationProxies;
+	}
+
+	public TypeSerializerSerializationProxy<?> getKeySerializerProxy() {
+		return keySerializerProxy;
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		super.write(out);
+
+		keySerializerProxy.write(out);
+
+		out.writeShort(namedStateSerializationProxies.size());
+
+		for (StateMetaInfo<?, ?> kvState : namedStateSerializationProxies) {
+			kvState.write(out);
+		}
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		super.read(in);
+
+		keySerializerProxy = new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+		keySerializerProxy.read(in);
+
+		int numKvStates = in.readShort();
+		namedStateSerializationProxies = new ArrayList<>(numKvStates);
+		for (int i = 0; i < numKvStates; ++i) {
+			StateMetaInfo<?, ?> stateSerializationProxy = new StateMetaInfo<>(userCodeClassLoader);
+			stateSerializationProxy.read(in);
+			namedStateSerializationProxies.add(stateSerializationProxy);
+		}
+	}
+
+//----------------------------------------------------------------------------------------------------------------------
+
+	/**
+	 * This is the serialization proxy for {@link RegisteredBackendStateMetaInfo} for a single registered state in a
+	 * keyed backend.
+	 */
+	public static class StateMetaInfo<N, S> implements IOReadableWritable {
+
+		private StateDescriptor.Type stateType;
+		private String stateName;
+		private TypeSerializerSerializationProxy<N> namespaceSerializerSerializationProxy;
+		private TypeSerializerSerializationProxy<S> stateSerializerSerializationProxy;
+
+		private ClassLoader userClassLoader;
+
+		StateMetaInfo(ClassLoader userClassLoader) {
+			this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
+		}
+
+		public StateMetaInfo(
+				StateDescriptor.Type stateType,
+				String name,
+				TypeSerializer<N> namespaceSerializer,
+				TypeSerializer<S> stateSerializer) {
+
+			this.stateType = Preconditions.checkNotNull(stateType);
+			this.stateName = Preconditions.checkNotNull(name);
+			this.namespaceSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(namespaceSerializer));
+			this.stateSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(stateSerializer));
+		}
+
+		public StateDescriptor.Type getStateType() {
+			return stateType;
+		}
+
+		public void setStateType(StateDescriptor.Type stateType) {
+			this.stateType = stateType;
+		}
+
+		public String getStateName() {
+			return stateName;
+		}
+
+		public void setStateName(String stateName) {
+			this.stateName = stateName;
+		}
+
+		public TypeSerializerSerializationProxy<N> getNamespaceSerializerSerializationProxy() {
+			return namespaceSerializerSerializationProxy;
+		}
+
+		public void setNamespaceSerializerSerializationProxy(TypeSerializerSerializationProxy<N> namespaceSerializerSerializationProxy) {
+			this.namespaceSerializerSerializationProxy = namespaceSerializerSerializationProxy;
+		}
+
+		public TypeSerializerSerializationProxy<S> getStateSerializerSerializationProxy() {
+			return stateSerializerSerializationProxy;
+		}
+
+		public void setStateSerializerSerializationProxy(TypeSerializerSerializationProxy<S> stateSerializerSerializationProxy) {
+			this.stateSerializerSerializationProxy = stateSerializerSerializationProxy;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeInt(getStateType().ordinal());
+			out.writeUTF(getStateName());
+
+			getNamespaceSerializerSerializationProxy().write(out);
+			getStateSerializerSerializationProxy().write(out);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			int enumOrdinal = in.readInt();
+			setStateType(StateDescriptor.Type.values()[enumOrdinal]);
+			setStateName(in.readUTF());
+
+			namespaceSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(userClassLoader);
+			namespaceSerializerSerializationProxy.read(in);
+
+			stateSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(userClassLoader);
+			stateSerializerSerializationProxy.read(in);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			StateMetaInfo<?, ?> that = (StateMetaInfo<?, ?>) o;
+
+			if (!getStateName().equals(that.getStateName())) {
+				return false;
+			}
+
+			if (!getNamespaceSerializerSerializationProxy().equals(that.getNamespaceSerializerSerializationProxy())) {
+				return false;
+			}
+
+			return getStateSerializerSerializationProxy().equals(that.getStateSerializerSerializationProxy());
+		}
+
+		@Override
+		public int hashCode() {
+			int result = getStateName().hashCode();
+			result = 31 * result + getNamespaceSerializerSerializationProxy().hashCode();
+			result = 31 * result + getStateSerializerSerializationProxy().hashCode();
+			return result;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
new file mode 100644
index 0000000..61df979
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serialization proxy for all meta data in operator state backends. In the future we might also migrate the actual state
+ * serialization logic here.
+ */
+public class OperatorBackendSerializationProxy extends VersionedIOReadableWritable {
+
+	private static final int VERSION = 1;
+
+	private List<StateMetaInfo<?>> namedStateSerializationProxies;
+	private ClassLoader userCodeClassLoader;
+
+	public OperatorBackendSerializationProxy(ClassLoader userCodeClassLoader) {
+		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+	}
+
+	public OperatorBackendSerializationProxy(List<StateMetaInfo<?>> namedStateSerializationProxies) {
+		this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
+		Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		super.write(out);
+
+		out.writeShort(namedStateSerializationProxies.size());
+
+		for (StateMetaInfo<?> kvState : namedStateSerializationProxies) {
+			kvState.write(out);
+		}
+	}
+
+	@Override
+	public void read(DataInputView out) throws IOException {
+		super.read(out);
+
+		int numKvStates = out.readShort();
+		namedStateSerializationProxies = new ArrayList<>(numKvStates);
+		for (int i = 0; i < numKvStates; ++i) {
+			StateMetaInfo<?> stateSerializationProxy = new StateMetaInfo<>(userCodeClassLoader);
+			stateSerializationProxy.read(out);
+			namedStateSerializationProxies.add(stateSerializationProxy);
+		}
+	}
+
+	public List<StateMetaInfo<?>> getNamedStateSerializationProxies() {
+		return namedStateSerializationProxies;
+	}
+
+	//----------------------------------------------------------------------------------------------------------------------
+
+	public static class StateMetaInfo<S> implements IOReadableWritable {
+
+		private String name;
+		private TypeSerializer<S> stateSerializer;
+		private ClassLoader userClassLoader;
+
+		private StateMetaInfo(ClassLoader userClassLoader) {
+			this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
+		}
+
+		public StateMetaInfo(String name, TypeSerializer<S> stateSerializer) {
+			this.name = Preconditions.checkNotNull(name);
+			this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
+		}
+
+		public String getName() {
+			return name;
+		}
+
+		public void setName(String name) {
+			this.name = name;
+		}
+
+		public TypeSerializer<S> getStateSerializer() {
+			return stateSerializer;
+		}
+
+		public void setStateSerializer(TypeSerializer<S> stateSerializer) {
+			this.stateSerializer = stateSerializer;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeUTF(getName());
+			DataOutputViewStream dos = new DataOutputViewStream(out);
+			InstantiationUtil.serializeObject(dos, getStateSerializer());
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			setName(in.readUTF());
+			DataInputViewStream dis = new DataInputViewStream(in);
+			try {
+				TypeSerializer<S> stateSerializer = InstantiationUtil.deserializeObject(dis, userClassLoader);
+				setStateSerializer(stateSerializer);
+			} catch (ClassNotFoundException exception) {
+				throw new IOException(exception);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
new file mode 100644
index 0000000..62418c3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
+ * state name.
+ *
+ * @param <N> Type of namespace
+ * @param <S> Type of state value
+ */
+public class RegisteredBackendStateMetaInfo<N, S> {
+
+	private final StateDescriptor.Type stateType;
+	private final String name;
+	private final TypeSerializer<N> namespaceSerializer;
+	private final TypeSerializer<S> stateSerializer;
+
+	public RegisteredBackendStateMetaInfo(KeyedBackendSerializationProxy.StateMetaInfo<N, S> metaInfoProxy) {
+		this.stateType = metaInfoProxy.getStateType();
+		this.name = metaInfoProxy.getStateName();
+		this.namespaceSerializer = metaInfoProxy.getNamespaceSerializerSerializationProxy().getTypeSerializer();
+		this.stateSerializer = metaInfoProxy.getStateSerializerSerializationProxy().getTypeSerializer();
+	}
+
+	public RegisteredBackendStateMetaInfo(
+			StateDescriptor.Type stateType,
+			String name,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<S> stateSerializer) {
+
+		this.stateType = Preconditions.checkNotNull(stateType);
+		this.name = Preconditions.checkNotNull(name);
+		this.namespaceSerializer = namespaceSerializer;
+		this.stateSerializer = stateSerializer;
+	}
+
+	public StateDescriptor.Type getStateType() {
+		return stateType;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	public TypeSerializer<S> getStateSerializer() {
+		return stateSerializer;
+	}
+
+	public boolean isCompatibleWith(RegisteredBackendStateMetaInfo<?, ?> other) {
+
+		if (this == other) {
+			return true;
+		}
+
+		if (null == other) {
+			return false;
+		}
+
+		if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
+				&& !other.stateType.equals(StateDescriptor.Type.UNKNOWN)
+				&& !stateType.equals(other.stateType)) {
+			return false;
+		}
+
+		if (!name.equals(other.getName())) {
+			return false;
+		}
+
+		return namespaceSerializer.isCompatibleWith(other.namespaceSerializer)
+				&& stateSerializer.isCompatibleWith(other.stateSerializer);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		RegisteredBackendStateMetaInfo<?, ?> that = (RegisteredBackendStateMetaInfo<?, ?>) o;
+
+		if (!stateType.equals(that.stateType)) {
+			return false;
+		}
+
+		if (!getName().equals(that.getName())) {
+			return false;
+		}
+
+		if (getNamespaceSerializer() != null ? !getNamespaceSerializer().equals(that.getNamespaceSerializer()) : that.getNamespaceSerializer() != null) {
+			return false;
+		}
+		return getStateSerializer() != null ? getStateSerializer().equals(that.getStateSerializer()) : that.getStateSerializer() == null;
+	}
+
+	@Override
+	public int hashCode() {
+		int result = getName().hashCode();
+		result = 31 * result + getStateType().hashCode();
+		result = 31 * result + (getNamespaceSerializer() != null ? getNamespaceSerializer().hashCode() : 0);
+		result = 31 * result + (getStateSerializer() != null ? getStateSerializer().hashCode() : 0);
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 6e85b72..d07901b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -49,6 +50,8 @@ import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -61,6 +64,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.RunnableFuture;
 
@@ -122,53 +126,66 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	// ------------------------------------------------------------------------
 	//  state backend operations
 	// ------------------------------------------------------------------------
+
 	@SuppressWarnings("unchecked")
-	@Override
-	public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
-		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateDesc.getName());
+	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+			TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) {
+
+		String name = stateDesc.getName();
+		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(name);
+
+		RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
+				new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, stateDesc.getSerializer());
+
+		return tryRegisterStateTable(stateTable, newMetaInfo);
+	}
+
+	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+			StateTable<K, N, V> stateTable, RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
 
 		if (stateTable == null) {
-			stateTable = new StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange);
-			stateTables.put(stateDesc.getName(), stateTable);
+			stateTable = new StateTable<>(newMetaInfo, keyGroupRange);
+			stateTables.put(newMetaInfo.getName(), stateTable);
+		} else {
+			if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
+				throw new RuntimeException("Trying to access state using incompatible meta info, was " +
+						stateTable.getMetaInfo() + " trying access with " + newMetaInfo);
+			}
+			stateTable.setMetaInfo(newMetaInfo);
 		}
+		return stateTable;
+	}
 
+	@SuppressWarnings("unchecked")
+	@Override
+	public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
+		StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
 		return new HeapValueState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
 	@SuppressWarnings("unchecked")
 	@Override
 	public <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
-		StateTable<K, N, ArrayList<T>> stateTable = (StateTable<K, N, ArrayList<T>>) stateTables.get(stateDesc.getName());
+		String name = stateDesc.getName();
+		StateTable<K, N, ArrayList<T>> stateTable = (StateTable<K, N, ArrayList<T>>) stateTables.get(name);
 
-		if (stateTable == null) {
-			stateTable = new StateTable<>(new ArrayListSerializer<>(stateDesc.getSerializer()), namespaceSerializer, keyGroupRange);
-			stateTables.put(stateDesc.getName(), stateTable);
-		}
+		RegisteredBackendStateMetaInfo<N, ArrayList<T>> newMetaInfo =
+				new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()));
 
+		stateTable = tryRegisterStateTable(stateTable, newMetaInfo);
 		return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
+
 	@SuppressWarnings("unchecked")
 	@Override
 	public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
-		StateTable<K, N, T> stateTable = (StateTable<K, N, T>) stateTables.get(stateDesc.getName());
-
-		if (stateTable == null) {
-			stateTable = new StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange);
-			stateTables.put(stateDesc.getName(), stateTable);
-		}
-
+		StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
 		return new HeapReducingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 	@SuppressWarnings("unchecked")
 	@Override
 	protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-		StateTable<K, N, ACC> stateTable = (StateTable<K, N, ACC>) stateTables.get(stateDesc.getName());
-
-		if (stateTable == null) {
-			stateTable = new StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange);
-			stateTables.put(stateDesc.getName(), stateTable);
-		}
-
+		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
 		return new HeapFoldingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
@@ -192,23 +209,28 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					"Too many KV-States: " + stateTables.size() +
 							". Currently at most " + Short.MAX_VALUE + " states are supported");
 
-			outView.writeShort(stateTables.size());
+			List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size());
 
 			Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
 
 			for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
 
-				outView.writeUTF(kvState.getKey());
-
-				TypeSerializer<?> namespaceSerializer = kvState.getValue().getNamespaceSerializer();
-				TypeSerializer<?> stateSerializer = kvState.getValue().getStateSerializer();
-
-				InstantiationUtil.serializeObject(stream, namespaceSerializer);
-				InstantiationUtil.serializeObject(stream, stateSerializer);
+				RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
+				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
+						metaInfo.getStateType(),
+						metaInfo.getName(),
+						metaInfo.getNamespaceSerializer(),
+						metaInfo.getStateSerializer());
 
+				metaInfoProxyList.add(metaInfoProxy);
 				kVStateToId.put(kvState.getKey(), kVStateToId.size());
 			}
 
+			KeyedBackendSerializationProxy serializationProxy =
+					new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+
+			serializationProxy.write(outView);
+
 			int offsetCounter = 0;
 			long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
 
@@ -278,23 +300,27 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			try {
 				DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
 
-				int numKvStates = inView.readShort();
+				KeyedBackendSerializationProxy serializationProxy =
+						new KeyedBackendSerializationProxy(userCodeClassLoader);
 
-				for (int i = 0; i < numKvStates; ++i) {
-					String stateName = inView.readUTF();
+				serializationProxy.read(inView);
 
-					TypeSerializer<?> namespaceSerializer =
-							InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
-					TypeSerializer<?> stateSerializer =
-							InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
+				List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
+						serializationProxy.getNamedStateSerializationProxies();
 
-					StateTable<K, ?, ?> stateTable = stateTables.get(stateName);
+				for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoSerializationProxy : metaInfoList) {
+
+					StateTable<K, ?, ?> stateTable = stateTables.get(metaInfoSerializationProxy.getStateName());
 
 					//important: only create a new table we did not already create it previously
 					if (null == stateTable) {
-						stateTable = new StateTable<>(stateSerializer, namespaceSerializer, keyGroupRange);
-						stateTables.put(stateName, stateTable);
-						kvStatesById.put(numRegisteredKvStates, stateName);
+
+						RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
+								new RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy);
+
+						stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
+						stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
+						kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName());
 						++numRegisteredKvStates;
 					}
 				}
@@ -307,7 +333,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					int writtenKeyGroupIndex = inView.readInt();
 					assert writtenKeyGroupIndex == keyGroupIndex;
 
-					for (int i = 0; i < numKvStates; i++) {
+					for (int i = 0; i < metaInfoList.size(); i++) {
 						int kvStateId = inView.readShort();
 
 						byte isPresent = inView.readByte();
@@ -419,11 +445,18 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				rawResultMap.put(VoidNamespace.INSTANCE, nullNameSpaceFix);
 			}
 
-			StateTable<K, ?, ?> stateTable = new StateTable<>(stateSerializer, namespaceSerializer, keyGroupRange);
+			RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
+					new RegisteredBackendStateMetaInfo<>(
+							StateDescriptor.Type.UNKNOWN,
+							nameToState.getKey(),
+							namespaceSerializer,
+							stateSerializer);
+
+			StateTable<K, ?, ?> stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
 			stateTable.getState().set(0, rawResultMap);
 
 			// add named state to the backend
-			getStateTables().put(nameToState.getKey(), stateTable);
+			getStateTables().put(registeredBackendStateMetaInfo.getName(), stateTable);
 		}
 	}