You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/07 19:47:01 UTC

[3/8] flink git commit: [FLINK-6178] [core] Allow serializer upgrades for managed state

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
index d571dcc..91d7aab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
@@ -18,15 +18,10 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionMismatchException;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -34,23 +29,29 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Serialization proxy for all meta data in operator state backends. In the future we might also migrate the actual state
+ * Serialization proxy for all meta data in operator state backends. In the future we might also requiresMigration the actual state
  * serialization logic here.
  */
 public class OperatorBackendSerializationProxy extends VersionedIOReadableWritable {
 
-	private static final int VERSION = 1;
+	public static final int VERSION = 2;
 
-	private List<StateMetaInfo<?>> namedStateSerializationProxies;
+	private List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> stateMetaInfoSnapshots;
 	private ClassLoader userCodeClassLoader;
 
+	private int restoredVersion;
+
 	public OperatorBackendSerializationProxy(ClassLoader userCodeClassLoader) {
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 	}
 
-	public OperatorBackendSerializationProxy(List<StateMetaInfo<?>> namedStateSerializationProxies) {
-		this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
-		Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
+	public OperatorBackendSerializationProxy(
+			List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> stateMetaInfoSnapshots) {
+
+		this.stateMetaInfoSnapshots = Preconditions.checkNotNull(stateMetaInfoSnapshots);
+		Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE);
+
+		this.restoredVersion = VERSION;
 	}
 
 	@Override
@@ -59,129 +60,44 @@ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritab
 	}
 
 	@Override
-	public void write(DataOutputView out) throws IOException {
-		super.write(out);
-
-		out.writeShort(namedStateSerializationProxies.size());
-
-		for (StateMetaInfo<?> kvState : namedStateSerializationProxies) {
-			kvState.write(out);
-		}
+	protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
+		super.resolveVersionRead(foundVersion);
+		this.restoredVersion = foundVersion;
 	}
 
 	@Override
-	public void read(DataInputView out) throws IOException {
-		super.read(out);
-
-		int numKvStates = out.readShort();
-		namedStateSerializationProxies = new ArrayList<>(numKvStates);
-		for (int i = 0; i < numKvStates; ++i) {
-			StateMetaInfo<?> stateSerializationProxy = new StateMetaInfo<>(userCodeClassLoader);
-			stateSerializationProxy.read(out);
-			namedStateSerializationProxies.add(stateSerializationProxy);
-		}
+	public boolean isCompatibleVersion(int version) {
+		// we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x)
+		return super.isCompatibleVersion(version) || version == 1;
 	}
 
-	public List<StateMetaInfo<?>> getNamedStateSerializationProxies() {
-		return namedStateSerializationProxies;
-	}
-
-	//----------------------------------------------------------------------------------------------------------------------
-
-	public static class StateMetaInfo<S> implements IOReadableWritable {
-
-		private String name;
-		private TypeSerializer<S> stateSerializer;
-		private OperatorStateHandle.Mode mode;
-
-		private ClassLoader userClassLoader;
-
-		@VisibleForTesting
-		public StateMetaInfo(ClassLoader userClassLoader) {
-			this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
-		}
-
-		public StateMetaInfo(String name, TypeSerializer<S> stateSerializer, OperatorStateHandle.Mode mode) {
-			this.name = Preconditions.checkNotNull(name);
-			this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
-			this.mode = Preconditions.checkNotNull(mode);
-		}
-
-		public String getName() {
-			return name;
-		}
-
-		public void setName(String name) {
-			this.name = name;
-		}
-
-		public TypeSerializer<S> getStateSerializer() {
-			return stateSerializer;
-		}
-
-		public void setStateSerializer(TypeSerializer<S> stateSerializer) {
-			this.stateSerializer = stateSerializer;
-		}
-
-		public OperatorStateHandle.Mode getMode() {
-			return mode;
-		}
-
-		public void setMode(OperatorStateHandle.Mode mode) {
-			this.mode = mode;
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			out.writeUTF(getName());
-			out.writeByte(getMode().ordinal());
-			DataOutputViewStream dos = new DataOutputViewStream(out);
-			InstantiationUtil.serializeObject(dos, getStateSerializer());
-		}
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		super.write(out);
 
-		@Override
-		public void read(DataInputView in) throws IOException {
-			setName(in.readUTF());
-			setMode(OperatorStateHandle.Mode.values()[in.readByte()]);
-			DataInputViewStream dis = new DataInputViewStream(in);
-			try {
-				TypeSerializer<S> stateSerializer = InstantiationUtil.deserializeObject(dis, userClassLoader);
-				setStateSerializer(stateSerializer);
-			} catch (ClassNotFoundException exception) {
-				throw new IOException(exception);
-			}
+		out.writeShort(stateMetaInfoSnapshots.size());
+		for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> kvState : stateMetaInfoSnapshots) {
+			OperatorBackendStateMetaInfoSnapshotReaderWriters
+				.getWriterForVersion(VERSION, kvState)
+				.writeStateMetaInfo(out);
 		}
+	}
 
-		@Override
-		public boolean equals(Object o) {
-
-			if (this == o) {
-				return true;
-			}
-
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-
-			StateMetaInfo<?> metaInfo = (StateMetaInfo<?>) o;
-
-			if (!getName().equals(metaInfo.getName())) {
-				return false;
-			}
-
-			if (!getStateSerializer().equals(metaInfo.getStateSerializer())) {
-				return false;
-			}
-
-			return getMode() == metaInfo.getMode();
+	@Override
+	public void read(DataInputView in) throws IOException {
+		super.read(in);
+
+		int numKvStates = in.readShort();
+		stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+		for (int i = 0; i < numKvStates; i++) {
+			stateMetaInfoSnapshots.add(
+				OperatorBackendStateMetaInfoSnapshotReaderWriters
+					.getReaderForVersion(restoredVersion, userCodeClassLoader)
+					.readStateMetaInfo(in));
 		}
+	}
 
-		@Override
-		public int hashCode() {
-			int result = getName().hashCode();
-			result = 31 * result + getStateSerializer().hashCode();
-			result = 31 * result + getMode().hashCode();
-			return result;
-		}
+	public List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> getStateMetaInfoSnapshots() {
+		return stateMetaInfoSnapshots;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
new file mode 100644
index 0000000..9ab106b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Readers and writers for different versions of the {@link RegisteredOperatorBackendStateMetaInfo.Snapshot}.
+ * Outdated formats are also kept here for documentation of history backlog.
+ */
+public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
+
+	// -------------------------------------------------------------------------------
+	//  Writers
+	//   - v1: Flink 1.2.x
+	//   - v2: Flink 1.3.x
+	// -------------------------------------------------------------------------------
+
+	public static <S> OperatorBackendStateMetaInfoWriter getWriterForVersion(
+			int version, RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
+
+		switch (version) {
+			case 1:
+				return new OperatorBackendStateMetaInfoWriterV1<>(stateMetaInfo);
+
+			// current version
+			case OperatorBackendSerializationProxy.VERSION:
+				return new OperatorBackendStateMetaInfoWriterV2<>(stateMetaInfo);
+
+			default:
+				// guard for future
+				throw new IllegalStateException(
+						"Unrecognized operator backend state meta info writer version: " + version);
+		}
+	}
+
+	public interface OperatorBackendStateMetaInfoWriter {
+		void writeStateMetaInfo(DataOutputView out) throws IOException;
+	}
+
+	public static abstract class AbstractOperatorBackendStateMetaInfoWriter<S>
+			implements OperatorBackendStateMetaInfoWriter {
+
+		protected final RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo;
+
+		public AbstractOperatorBackendStateMetaInfoWriter(RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
+			this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
+		}
+	}
+
+	public static class OperatorBackendStateMetaInfoWriterV1<S> extends AbstractOperatorBackendStateMetaInfoWriter<S> {
+
+		public OperatorBackendStateMetaInfoWriterV1(RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
+			super(stateMetaInfo);
+		}
+
+		@Override
+		public void writeStateMetaInfo(DataOutputView out) throws IOException {
+			out.writeUTF(stateMetaInfo.getName());
+			out.writeByte(stateMetaInfo.getAssignmentMode().ordinal());
+			new TypeSerializerSerializationProxy<>(stateMetaInfo.getPartitionStateSerializer()).write(out);
+		}
+	}
+
+	public static class OperatorBackendStateMetaInfoWriterV2<S> extends AbstractOperatorBackendStateMetaInfoWriter<S> {
+
+		public OperatorBackendStateMetaInfoWriterV2(RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
+			super(stateMetaInfo);
+		}
+
+		@Override
+		public void writeStateMetaInfo(DataOutputView out) throws IOException {
+			out.writeUTF(stateMetaInfo.getName());
+			out.writeByte(stateMetaInfo.getAssignmentMode().ordinal());
+
+			// write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures
+			try (
+				ByteArrayOutputStreamWithPos outWithPos = new ByteArrayOutputStreamWithPos();
+				DataOutputViewStreamWrapper outViewWrapper = new DataOutputViewStreamWrapper(outWithPos)) {
+
+				new TypeSerializerSerializationProxy<>(stateMetaInfo.getPartitionStateSerializer()).write(outViewWrapper);
+
+				// write the start offset of the config snapshot
+				out.writeInt(outWithPos.getPosition());
+				TypeSerializerUtil.writeSerializerConfigSnapshot(
+					outViewWrapper,
+					stateMetaInfo.getPartitionStateSerializerConfigSnapshot());
+
+				// write the total number of bytes and flush
+				out.writeInt(outWithPos.getPosition());
+				out.write(outWithPos.getBuf(), 0, outWithPos.getPosition());
+			}
+		}
+	}
+
+	// -------------------------------------------------------------------------------
+	//  Readers
+	//   - v1: Flink 1.2.x
+	//   - v2: Flink 1.3.x
+	// -------------------------------------------------------------------------------
+
+	public static <S> OperatorBackendStateMetaInfoReader<S> getReaderForVersion(
+			int version, ClassLoader userCodeClassLoader) {
+
+		switch (version) {
+			case 1:
+				return new OperatorBackendStateMetaInfoReaderV1<>(userCodeClassLoader);
+
+			// current version
+			case OperatorBackendSerializationProxy.VERSION:
+				return new OperatorBackendStateMetaInfoReaderV2<>(userCodeClassLoader);
+
+			default:
+				// guard for future
+				throw new IllegalStateException(
+					"Unrecognized operator backend state meta info reader version: " + version);
+		}
+	}
+
+	public interface OperatorBackendStateMetaInfoReader<S> {
+		RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException;
+	}
+
+	public static abstract class AbstractOperatorBackendStateMetaInfoReader<S>
+		implements OperatorBackendStateMetaInfoReader<S> {
+
+		protected final ClassLoader userCodeClassLoader;
+
+		public AbstractOperatorBackendStateMetaInfoReader(ClassLoader userCodeClassLoader) {
+			this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+		}
+	}
+
+	public static class OperatorBackendStateMetaInfoReaderV1<S> extends AbstractOperatorBackendStateMetaInfoReader<S> {
+
+		public OperatorBackendStateMetaInfoReaderV1(ClassLoader userCodeClassLoader) {
+			super(userCodeClassLoader);
+		}
+
+		@Override
+		public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException {
+			RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo =
+				new RegisteredOperatorBackendStateMetaInfo.Snapshot<>();
+
+			stateMetaInfo.setName(in.readUTF());
+			stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
+			DataInputViewStream dis = new DataInputViewStream(in);
+			try {
+				TypeSerializer<S> stateSerializer = InstantiationUtil.deserializeObject(dis, userCodeClassLoader);
+				stateMetaInfo.setPartitionStateSerializer(stateSerializer);
+			} catch (ClassNotFoundException exception) {
+				throw new IOException(exception);
+			}
+
+			// old versions do not contain the partition state serializer's configuration snapshot
+			stateMetaInfo.setPartitionStateSerializerConfigSnapshot(null);
+
+			return stateMetaInfo;
+		}
+	}
+
+	public static class OperatorBackendStateMetaInfoReaderV2<S> extends AbstractOperatorBackendStateMetaInfoReader<S> {
+
+		public OperatorBackendStateMetaInfoReaderV2(ClassLoader userCodeClassLoader) {
+			super(userCodeClassLoader);
+		}
+
+		@Override
+		public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException {
+			RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo =
+				new RegisteredOperatorBackendStateMetaInfo.Snapshot<>();
+
+			stateMetaInfo.setName(in.readUTF());
+			stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
+
+			// read start offset of configuration snapshot
+			int configSnapshotStartOffset = in.readInt();
+
+			int totalBytes = in.readInt();
+
+			byte[] buffer = new byte[totalBytes];
+			in.readFully(buffer);
+
+			ByteArrayInputStreamWithPos inWithPos = new ByteArrayInputStreamWithPos(buffer);
+			DataInputViewStreamWrapper inViewWrapper = new DataInputViewStreamWrapper(inWithPos);
+
+			try {
+				final TypeSerializerSerializationProxy<S> partitionStateSerializerProxy =
+					new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+				partitionStateSerializerProxy.read(inViewWrapper);
+				stateMetaInfo.setPartitionStateSerializer(partitionStateSerializerProxy.getTypeSerializer());
+			} catch (IOException e) {
+				stateMetaInfo.setPartitionStateSerializer(null);
+			}
+
+			// make sure we start from the partition state serializer bytes position
+			inWithPos.setPosition(configSnapshotStartOffset);
+			stateMetaInfo.setPartitionStateSerializerConfigSnapshot(
+				TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, userCodeClassLoader));
+
+			return stateMetaInfo;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
deleted file mode 100644
index 0d4b3c8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
- * state name.
- *
- * @param <N> Type of namespace
- * @param <S> Type of state value
- */
-public class RegisteredBackendStateMetaInfo<N, S> {
-
-	private final StateDescriptor.Type stateType;
-	private final String name;
-	private final TypeSerializer<N> namespaceSerializer;
-	private final TypeSerializer<S> stateSerializer;
-
-	public RegisteredBackendStateMetaInfo(KeyedBackendSerializationProxy.StateMetaInfo<N, S> metaInfoProxy) {
-		this(
-				metaInfoProxy.getStateType(),
-				metaInfoProxy.getStateName(),
-				metaInfoProxy.getNamespaceSerializerSerializationProxy().getTypeSerializer(),
-				metaInfoProxy.getStateSerializerSerializationProxy().getTypeSerializer());
-	}
-
-	public RegisteredBackendStateMetaInfo(
-			StateDescriptor.Type stateType,
-			String name,
-			TypeSerializer<N> namespaceSerializer,
-			TypeSerializer<S> stateSerializer) {
-
-		this.stateType = checkNotNull(stateType);
-		this.name = checkNotNull(name);
-		this.namespaceSerializer = checkNotNull(namespaceSerializer);
-		this.stateSerializer = checkNotNull(stateSerializer);
-	}
-
-	public StateDescriptor.Type getStateType() {
-		return stateType;
-	}
-
-	public String getName() {
-		return name;
-	}
-
-	public TypeSerializer<N> getNamespaceSerializer() {
-		return namespaceSerializer;
-	}
-
-	public TypeSerializer<S> getStateSerializer() {
-		return stateSerializer;
-	}
-
-	public boolean canRestoreFrom(RegisteredBackendStateMetaInfo<?, ?> other) {
-
-		if (this == other) {
-			return true;
-		}
-
-		if (null == other) {
-			return false;
-		}
-
-		if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
-				&& !other.stateType.equals(StateDescriptor.Type.UNKNOWN)
-				&& !stateType.equals(other.stateType)) {
-			return false;
-		}
-
-		if (!name.equals(other.getName())) {
-			return false;
-		}
-
-		return (stateSerializer.canRestoreFrom(other.stateSerializer)) &&
-				(namespaceSerializer.canRestoreFrom(other.namespaceSerializer)
-						// we also check if there is just a migration proxy that should be replaced by any real serializer
-						|| other.namespaceSerializer instanceof MigrationNamespaceSerializerProxy);
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		RegisteredBackendStateMetaInfo<?, ?> that = (RegisteredBackendStateMetaInfo<?, ?>) o;
-
-		if (!stateType.equals(that.stateType)) {
-			return false;
-		}
-
-		if (!getName().equals(that.getName())) {
-			return false;
-		}
-
-		return getStateSerializer().equals(that.getStateSerializer())
-				&& getNamespaceSerializer().equals(that.getNamespaceSerializer());
-	}
-
-	@Override
-	public String toString() {
-		return "RegisteredBackendStateMetaInfo{" +
-				"stateType=" + stateType +
-				", name='" + name + '\'' +
-				", namespaceSerializer=" + namespaceSerializer +
-				", stateSerializer=" + stateSerializer +
-				'}';
-	}
-
-	@Override
-	public int hashCode() {
-		int result = getName().hashCode();
-		result = 31 * result + getStateType().hashCode();
-		result = 31 * result + (getNamespaceSerializer() != null ? getNamespaceSerializer().hashCode() : 0);
-		result = 31 * result + (getStateSerializer() != null ? getStateSerializer().hashCode() : 0);
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
new file mode 100644
index 0000000..e1a7e06
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
+ * state name.
+ *
+ * @param <N> Type of namespace
+ * @param <S> Type of state value
+ */
+public class RegisteredKeyedBackendStateMetaInfo<N, S> {
+
+	private final StateDescriptor.Type stateType;
+	private final String name;
+	private final TypeSerializer<N> namespaceSerializer;
+	private final TypeSerializer<S> stateSerializer;
+
+	public RegisteredKeyedBackendStateMetaInfo(
+			StateDescriptor.Type stateType,
+			String name,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<S> stateSerializer) {
+
+		this.stateType = checkNotNull(stateType);
+		this.name = checkNotNull(name);
+		this.namespaceSerializer = checkNotNull(namespaceSerializer);
+		this.stateSerializer = checkNotNull(stateSerializer);
+	}
+
+	public StateDescriptor.Type getStateType() {
+		return stateType;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	public TypeSerializer<S> getStateSerializer() {
+		return stateSerializer;
+	}
+
+	public Snapshot<N, S> snapshot() {
+		return new Snapshot<>(
+			stateType,
+			name,
+			namespaceSerializer.duplicate(),
+			stateSerializer.duplicate(),
+			namespaceSerializer.snapshotConfiguration(),
+			stateSerializer.snapshotConfiguration());
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		RegisteredKeyedBackendStateMetaInfo<?, ?> that = (RegisteredKeyedBackendStateMetaInfo<?, ?>) o;
+
+		if (!stateType.equals(that.stateType)) {
+			return false;
+		}
+
+		if (!getName().equals(that.getName())) {
+			return false;
+		}
+
+		return getStateSerializer().equals(that.getStateSerializer())
+				&& getNamespaceSerializer().equals(that.getNamespaceSerializer());
+	}
+
+	@Override
+	public String toString() {
+		return "RegisteredKeyedBackendStateMetaInfo{" +
+				"stateType=" + stateType +
+				", name='" + name + '\'' +
+				", namespaceSerializer=" + namespaceSerializer +
+				", stateSerializer=" + stateSerializer +
+				'}';
+	}
+
+	@Override
+	public int hashCode() {
+		int result = getName().hashCode();
+		result = 31 * result + getStateType().hashCode();
+		result = 31 * result + getNamespaceSerializer().hashCode();
+		result = 31 * result + getStateSerializer().hashCode();
+		return result;
+	}
+
+	/**
+	 * A consistent snapshot of a {@link RegisteredKeyedBackendStateMetaInfo}.
+	 */
+	public static class Snapshot<N, S> {
+
+		private StateDescriptor.Type stateType;
+		private String name;
+		private TypeSerializer<N> namespaceSerializer;
+		private TypeSerializer<S> stateSerializer;
+		private TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot;
+		private TypeSerializerConfigSnapshot stateSerializerConfigSnapshot;
+
+		/** Empty constructor used when restoring the state meta info snapshot. */
+		Snapshot() {}
+
+		private Snapshot(
+				StateDescriptor.Type stateType,
+				String name,
+				TypeSerializer<N> namespaceSerializer,
+				TypeSerializer<S> stateSerializer,
+				TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot,
+				TypeSerializerConfigSnapshot stateSerializerConfigSnapshot) {
+
+			this.stateType = Preconditions.checkNotNull(stateType);
+			this.name = Preconditions.checkNotNull(name);
+			this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer);
+			this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
+			this.namespaceSerializerConfigSnapshot = Preconditions.checkNotNull(namespaceSerializerConfigSnapshot);
+			this.stateSerializerConfigSnapshot = Preconditions.checkNotNull(stateSerializerConfigSnapshot);
+		}
+
+		public StateDescriptor.Type getStateType() {
+			return stateType;
+		}
+
+		void setStateType(StateDescriptor.Type stateType) {
+			this.stateType = stateType;
+		}
+
+		public String getName() {
+			return name;
+		}
+
+		void setName(String name) {
+			this.name = name;
+		}
+
+		public TypeSerializer<N> getNamespaceSerializer() {
+			return namespaceSerializer;
+		}
+
+		void setNamespaceSerializer(TypeSerializer<N> namespaceSerializer) {
+			this.namespaceSerializer = namespaceSerializer;
+		}
+
+		public TypeSerializer<S> getStateSerializer() {
+			return stateSerializer;
+		}
+
+		void setStateSerializer(TypeSerializer<S> stateSerializer) {
+			this.stateSerializer = stateSerializer;
+		}
+
+		public TypeSerializerConfigSnapshot getNamespaceSerializerConfigSnapshot() {
+			return namespaceSerializerConfigSnapshot;
+		}
+
+		void setNamespaceSerializerConfigSnapshot(TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot) {
+			this.namespaceSerializerConfigSnapshot = namespaceSerializerConfigSnapshot;
+		}
+
+		public TypeSerializerConfigSnapshot getStateSerializerConfigSnapshot() {
+			return stateSerializerConfigSnapshot;
+		}
+
+		void setStateSerializerConfigSnapshot(TypeSerializerConfigSnapshot stateSerializerConfigSnapshot) {
+			this.stateSerializerConfigSnapshot = stateSerializerConfigSnapshot;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			Snapshot<?, ?> that = (Snapshot<?, ?>) o;
+
+			if (!stateType.equals(that.stateType)) {
+				return false;
+			}
+
+			if (!getName().equals(that.getName())) {
+				return false;
+			}
+
+			// need to check for nulls because serializer and config snapshots may be null on restore
+			return
+				((getStateSerializer() == null && that.getStateSerializer() == null)
+					|| getStateSerializer().equals(that.getStateSerializer()))
+				&& ((getNamespaceSerializer() == null && that.getNamespaceSerializer() == null)
+					|| getNamespaceSerializer().equals(that.getNamespaceSerializer()))
+				&& ((getNamespaceSerializerConfigSnapshot() == null && that.getNamespaceSerializerConfigSnapshot() == null)
+					|| getNamespaceSerializerConfigSnapshot().equals(that.getNamespaceSerializerConfigSnapshot()))
+				&& ((getStateSerializerConfigSnapshot() == null && that.getStateSerializerConfigSnapshot() == null)
+					|| getStateSerializerConfigSnapshot().equals(that.getStateSerializerConfigSnapshot()));
+		}
+
+		@Override
+		public int hashCode() {
+			// need to check for nulls because serializer and config snapshots may be null on restore
+			int result = getName().hashCode();
+			result = 31 * result + getStateType().hashCode();
+			result = 31 * result + (getNamespaceSerializer() != null ? getNamespaceSerializer().hashCode() : 0);
+			result = 31 * result + (getStateSerializer() != null ? getStateSerializer().hashCode() : 0);
+			result = 31 * result + (getNamespaceSerializerConfigSnapshot() != null ? getNamespaceSerializerConfigSnapshot().hashCode() : 0);
+			result = 31 * result + (getStateSerializerConfigSnapshot() != null ? getStateSerializerConfigSnapshot().hashCode() : 0);
+			return result;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
new file mode 100644
index 0000000..b43fc9c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Compound meta information for a registered state in an operator state backend.
+ * This contains the state name, assignment mode, and state partition serializer.
+ *
+ * @param <S> Type of the state.
+ */
+public class RegisteredOperatorBackendStateMetaInfo<S> {
+
+	/**
+	 * The name of the state, as registered by the user
+	 */
+	private final String name;
+
+	/**
+	 * The mode how elements in this state are assigned to tasks during restore
+	 */
+	private final OperatorStateHandle.Mode assignmentMode;
+
+	/**
+	 * The type serializer for the elements in the state list
+	 */
+	private final TypeSerializer<S> partitionStateSerializer;
+
+	public RegisteredOperatorBackendStateMetaInfo(
+			String name,
+			TypeSerializer<S> partitionStateSerializer,
+			OperatorStateHandle.Mode assignmentMode) {
+
+		this.name = Preconditions.checkNotNull(name);
+		this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
+		this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public OperatorStateHandle.Mode getAssignmentMode() {
+		return assignmentMode;
+	}
+
+	public TypeSerializer<S> getPartitionStateSerializer() {
+		return partitionStateSerializer;
+	}
+
+	public Snapshot<S> snapshot() {
+		return new Snapshot<>(
+			name,
+			assignmentMode,
+			partitionStateSerializer.duplicate(),
+			partitionStateSerializer.snapshotConfiguration());
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+
+		if (obj == null) {
+			return false;
+		}
+
+		return (obj instanceof RegisteredOperatorBackendStateMetaInfo)
+			&& name.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getName())
+			&& assignmentMode.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getAssignmentMode())
+			&& partitionStateSerializer.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getPartitionStateSerializer());
+	}
+
+	@Override
+	public int hashCode() {
+		int result = getName().hashCode();
+		result = 31 * result + getAssignmentMode().hashCode();
+		result = 31 * result + getPartitionStateSerializer().hashCode();
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "RegisteredOperatorBackendStateMetaInfo{" +
+			"name='" + name + "\'" +
+			", assignmentMode=" + assignmentMode +
+			", partitionStateSerializer=" + partitionStateSerializer +
+			'}';
+	}
+
+	/**
+	 * A consistent snapshot of a {@link RegisteredOperatorBackendStateMetaInfo}.
+	 */
+	public static class Snapshot<S> {
+
+		private String name;
+		private OperatorStateHandle.Mode assignmentMode;
+		private TypeSerializer<S> partitionStateSerializer;
+		private TypeSerializerConfigSnapshot partitionStateSerializerConfigSnapshot;
+
+		/** Empty constructor used when restoring the state meta info snapshot. */
+		Snapshot() {}
+
+		private Snapshot(
+				String name,
+				OperatorStateHandle.Mode assignmentMode,
+				TypeSerializer<S> partitionStateSerializer,
+				TypeSerializerConfigSnapshot partitionStateSerializerConfigSnapshot) {
+
+			this.name = Preconditions.checkNotNull(name);
+			this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
+			this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
+			this.partitionStateSerializerConfigSnapshot = Preconditions.checkNotNull(partitionStateSerializerConfigSnapshot);
+		}
+
+		public String getName() {
+			return name;
+		}
+
+		void setName(String name) {
+			this.name = name;
+		}
+
+		public OperatorStateHandle.Mode getAssignmentMode() {
+			return assignmentMode;
+		}
+
+		void setAssignmentMode(OperatorStateHandle.Mode assignmentMode) {
+			this.assignmentMode = assignmentMode;
+		}
+
+		public TypeSerializer<S> getPartitionStateSerializer() {
+			return partitionStateSerializer;
+		}
+
+		void setPartitionStateSerializer(TypeSerializer<S> partitionStateSerializer) {
+			this.partitionStateSerializer = partitionStateSerializer;
+		}
+
+		public TypeSerializerConfigSnapshot getPartitionStateSerializerConfigSnapshot() {
+			return partitionStateSerializerConfigSnapshot;
+		}
+
+		void setPartitionStateSerializerConfigSnapshot(TypeSerializerConfigSnapshot partitionStateSerializerConfigSnapshot) {
+			this.partitionStateSerializerConfigSnapshot = partitionStateSerializerConfigSnapshot;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == this) {
+				return true;
+			}
+
+			if (obj == null) {
+				return false;
+			}
+
+			// need to check for nulls because serializer and config snapshots may be null on restore
+			return (obj instanceof Snapshot)
+				&& name.equals(((Snapshot) obj).getName())
+				&& assignmentMode.equals(((Snapshot) obj).getAssignmentMode())
+				&& ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null)
+					|| partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer()))
+				&& ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null)
+					|| partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot()));
+		}
+
+		@Override
+		public int hashCode() {
+			// need to check for nulls because serializer and config snapshots may be null on restore
+			int result = getName().hashCode();
+			result = 31 * result + getAssignmentMode().hashCode();
+			result = 31 * result + (getPartitionStateSerializer() != null ? getPartitionStateSerializer().hashCode() : 0);
+			result = 31 * result + (getPartitionStateSerializerConfigSnapshot() != null ? getPartitionStateSerializerConfigSnapshot().hashCode() : 0);
+			return result;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
new file mode 100644
index 0000000..978f28d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * Utilities related to state migration, commonly used in the state backends.
+ */
+public class StateMigrationUtil {
+
+	/**
+	 * Resolves the final compatibility result of two serializers by taking into account compound information,
+	 * including the preceding serializer, the preceding serializer's configuration snapshot, and the new serializer.
+	 *
+	 * The final result is determined as follows:
+	 *   1. If there is no configuration snapshot of the preceding serializer,
+	 *      assumes the new serializer to be compatible.
+	 *   2. Confront the configuration snapshot with the new serializer.
+	 *   3. If the result is compatible, just return that as the result.
+	 *   4. If not compatible and requires migration, check if the preceding serializer is valid.
+	 *      If yes, use that as the convert deserializer for state migration.
+	 *   5. If the preceding serializer is not valid, check if the result came with a convert deserializer.
+	 *      If yes, use that for state migration and simply return the result.
+	 *   6. If all of above fails, state migration is required but could not be performed; throw exception.
+	 *
+	 * @param precedingSerializer the preceding serializer used to write the data
+	 * @param dummySerializerClassTag any class tags that identifies the preceding serializer as a dummy placeholder
+	 * @param precedingSerializerConfigSnapshot configuration snapshot of the preceding serializer
+	 * @param newSerializer the new serializer to ensure compatibility with
+	 *
+	 * @param <T> Type of the data handled by the serializers
+	 *
+	 * @return the final resolved compatiblity result
+	 */
+	public static <T> CompatibilityResult<T> resolveCompatibilityResult(
+			TypeSerializer<T> precedingSerializer,
+			Class<?> dummySerializerClassTag,
+			TypeSerializerConfigSnapshot precedingSerializerConfigSnapshot,
+			TypeSerializer<T> newSerializer) {
+
+		if (precedingSerializerConfigSnapshot != null) {
+			CompatibilityResult<T> initialResult = newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot);
+
+			if (!initialResult.requiresMigration()) {
+				return initialResult;
+			} else {
+				if (precedingSerializer != null && !(precedingSerializer.getClass().equals(dummySerializerClassTag))) {
+					// if the preceding serializer exists and is not a dummy, use
+					// that for converting instead of the provided convert deserializer
+					return CompatibilityResult.requiresMigration(precedingSerializer);
+				} else if (initialResult.getConvertDeserializer() != null) {
+					return initialResult;
+				} else {
+					throw new RuntimeException(
+						"State migration required, but there is no available serializer capable of reading previous data.");
+				}
+			}
+		} else {
+			// if the configuration snapshot of the preceding serializer cannot be provided,
+			// we can only simply assume that the new serializer is compatible
+			return CompatibilityResult.compatible();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
index 8b58891..2800899 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
 
 import java.io.IOException;
 
@@ -89,4 +90,11 @@ public final class VoidNamespaceSerializer extends TypeSerializerSingleton<VoidN
 	public boolean canEqual(Object obj) {
 		return obj instanceof VoidNamespaceSerializer;
 	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		// we might be replacing a migration namespace serializer, in which case we just assume compatibility
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+			|| identifier.equals(MigrationNamespaceSerializerProxy.class.getCanonicalName());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index d63b6d3..7b61da1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
@@ -196,7 +196,7 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
 	 * @param keyContext the key context.
 	 * @param metaInfo   the meta information, including the type serializer for state copy-on-write.
 	 */
-	CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+	CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
 		this(keyContext, metaInfo, 1024);
 	}
 
@@ -209,7 +209,7 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
 	 * @throws IllegalArgumentException when the capacity is less than zero.
 	 */
 	@SuppressWarnings("unchecked")
-	private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo, int capacity) {
+	private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo, int capacity) {
 		super(keyContext, metaInfo);
 
 		// initialized tables to EMPTY_TABLE.
@@ -532,12 +532,12 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
 	}
 
 	@Override
-	public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() {
+	public RegisteredKeyedBackendStateMetaInfo<N, S> getMetaInfo() {
 		return metaInfo;
 	}
 
 	@Override
-	public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+	public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
 		this.metaInfo = metaInfo;
 	}
 
@@ -1063,4 +1063,4 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
 			throw new UnsupportedOperationException("Read-only iterator");
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index aecc72e..866ed28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -132,8 +132,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			TypeSerializer<N> namespaceSerializer,
 			TypeSerializer<V> valueSerializer) {
 
-		final RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
-				new RegisteredBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
+		final RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo =
+				new RegisteredKeyedBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
 
 		@SuppressWarnings("unchecked")
 		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
@@ -142,12 +142,27 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			stateTable = newStateTable(newMetaInfo);
 			stateTables.put(stateName, stateTable);
 		} else {
-			if (!newMetaInfo.canRestoreFrom(stateTable.getMetaInfo())) {
-				throw new RuntimeException("Trying to access state using incompatible meta info, was " +
-						stateTable.getMetaInfo() + " trying access with " + newMetaInfo);
+			// TODO with eager registration in place, these checks should be moved to restorePartitionedState()
+
+			Preconditions.checkState(
+				stateName.equals(stateTable.getMetaInfo().getName()),
+				"Incompatible state names. " +
+					"Was [" + stateTable.getMetaInfo().getName() + "], " +
+					"registered with [" + newMetaInfo.getName() + "].");
+
+			if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
+					&& !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
+
+				Preconditions.checkState(
+					newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()),
+					"Incompatible state types. " +
+						"Was [" + stateTable.getMetaInfo().getStateType() + "], " +
+						"registered with [" + newMetaInfo.getStateType() + "].");
 			}
+
 			stateTable.setMetaInfo(newMetaInfo);
 		}
+
 		return stateTable;
 	}
 
@@ -240,21 +255,14 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				"Too many KV-States: " + stateTables.size() +
 						". Currently at most " + Short.MAX_VALUE + " states are supported");
 
-		List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size());
+		List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots = new ArrayList<>(stateTables.size());
 
 		final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
 
 		final Map<StateTable<K, ?, ?>, StateTableSnapshot> cowStateStableSnapshots = new HashedMap(stateTables.size());
 
 		for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
-			RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
-			KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
-					metaInfo.getStateType(),
-					metaInfo.getName(),
-					metaInfo.getNamespaceSerializer(),
-					metaInfo.getStateSerializer());
-
-			metaInfoProxyList.add(metaInfoProxy);
+			metaInfoSnapshots.add(kvState.getValue().getMetaInfo().snapshot());
 			kVStateToId.put(kvState.getKey(), kVStateToId.size());
 			StateTable<K, ?, ?> stateTable = kvState.getValue();
 			if (null != stateTable) {
@@ -263,7 +271,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		final KeyedBackendSerializationProxy serializationProxy =
-				new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+				new KeyedBackendSerializationProxy(keySerializer, metaInfoSnapshots);
 
 		//--------------------------------------------------- this becomes the end of sync part
 
@@ -376,23 +384,29 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				serializationProxy.read(inView);
 
-				List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
-						serializationProxy.getNamedStateSerializationProxies();
+				List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
+						serializationProxy.getStateMetaInfoSnapshots();
 
-				for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoSerializationProxy : metaInfoList) {
+				for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) {
 
-					StateTable<K, ?, ?> stateTable = stateTables.get(metaInfoSerializationProxy.getStateName());
+					StateTable<K, ?, ?> stateTable = stateTables.get(restoredMetaInfo.getName());
 
 					//important: only create a new table we did not already create it previously
 					if (null == stateTable) {
 
-						RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
-								new RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy);
+						RegisteredKeyedBackendStateMetaInfo<?, ?> registeredKeyedBackendStateMetaInfo =
+								new RegisteredKeyedBackendStateMetaInfo<>(
+									restoredMetaInfo.getStateType(),
+									restoredMetaInfo.getName(),
+									restoredMetaInfo.getNamespaceSerializer(),
+									restoredMetaInfo.getStateSerializer());
 
-						stateTable = newStateTable(registeredBackendStateMetaInfo);
-						stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
-						kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName());
+						stateTable = newStateTable(registeredKeyedBackendStateMetaInfo);
+						stateTables.put(restoredMetaInfo.getName(), stateTable);
+						kvStatesById.put(numRegisteredKvStates, restoredMetaInfo.getName());
 						++numRegisteredKvStates;
+					} else {
+						// TODO with eager state registration in place, check here for serializer migration strategies
 					}
 				}
 
@@ -410,7 +424,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
 							"Unexpected key-group in restore.");
 
-					for (int i = 0; i < metaInfoList.size(); i++) {
+					for (int i = 0; i < restoredMetaInfos.size(); i++) {
 						int kvStateId = inView.readShort();
 						StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId));
 
@@ -509,7 +523,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return sum;
 	}
 
-	public <N, V> StateTable<K, N, V> newStateTable(RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
+	public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
 		return asynchronousSnapshots ?
 				new CopyOnWriteStateTable<>(this, newMetaInfo) :
 				new NestedMapsStateTable<>(this, newMetaInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
index 22f344d..75c31db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.Preconditions;
 
@@ -63,7 +63,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
 	 * @param keyContext the key context.
 	 * @param metaInfo the meta information for this state table.
 	 */
-	public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+	public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
 		super(keyContext, metaInfo);
 		this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 62fc869..c1cdcc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.Preconditions;
 
@@ -42,14 +42,14 @@ public abstract class StateTable<K, N, S> {
 	/**
 	 * Combined meta information such as name and serializers for this state
 	 */
-	protected RegisteredBackendStateMetaInfo<N, S> metaInfo;
+	protected RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo;
 
 	/**
 	 *
 	 * @param keyContext the key context provides the key scope for all put/get/delete operations.
 	 * @param metaInfo the meta information, including the type serializer for state copy-on-write.
 	 */
-	public StateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+	public StateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
 		this.keyContext = Preconditions.checkNotNull(keyContext);
 		this.metaInfo = Preconditions.checkNotNull(metaInfo);
 	}
@@ -168,11 +168,11 @@ public abstract class StateTable<K, N, S> {
 		return metaInfo.getNamespaceSerializer();
 	}
 
-	public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() {
+	public RegisteredKeyedBackendStateMetaInfo<N, S> getMetaInfo() {
 		return metaInfo;
 	}
 
-	public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+	public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
 		this.metaInfo = metaInfo;
 	}
 
@@ -186,4 +186,4 @@ public abstract class StateTable<K, N, S> {
 
 	@VisibleForTesting
 	public abstract int sizeOfNamespace(Object namespace);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
index 53ec349..d7bc94e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
@@ -47,7 +47,8 @@ class StateTableByKeyGroupReaders {
 			case 1:
 				return new StateTableByKeyGroupReaderV1<>(table);
 			case 2:
-				return new StateTableByKeyGroupReaderV2<>(table);
+			case 3:
+				return new StateTableByKeyGroupReaderV2V3<>(table);
 			default:
 				throw new IllegalArgumentException("Unknown version: " + version);
 		}
@@ -110,10 +111,10 @@ class StateTableByKeyGroupReaders {
 		}
 	}
 
-	private static final class StateTableByKeyGroupReaderV2<K, N, S>
+	private static final class StateTableByKeyGroupReaderV2V3<K, N, S>
 			extends AbstractStateTableByKeyGroupReader<K, N, S> {
 
-		StateTableByKeyGroupReaderV2(StateTable<K, N, S> stateTable) {
+		StateTableByKeyGroupReaderV2V3(StateTable<K, N, S> stateTable) {
 			super(stateTable);
 		}
 
@@ -133,4 +134,4 @@ class StateTableByKeyGroupReaders {
 			}
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
index e872526..4bdc5e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.operators.testutils.types;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -127,4 +129,14 @@ public class IntListSerializer extends TypeSerializer<IntList> {
 	public int hashCode() {
 		return IntListSerializer.class.hashCode();
 	}
+
+	@Override
+	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CompatibilityResult<IntList> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		throw new UnsupportedOperationException();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
index e4a9264..0ae5e71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.operators.testutils.types;
 
 import java.io.IOException;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
@@ -136,4 +138,14 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
 			return obj.getClass() == IntPairSerializerFactory.class;
 		};
 	}
+
+	@Override
+	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CompatibilityResult<IntPair> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		throw new UnsupportedOperationException();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
index b62b097..17ee5f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.operators.testutils.types;
 
 import java.io.IOException;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.StringValue;
@@ -104,4 +106,14 @@ public class StringPairSerializer extends TypeSerializer<StringPair> {
 	public int hashCode() {
 		return StringPairSerializer.class.hashCode();
 	}
+
+	@Override
+	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CompatibilityResult<StringPair> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		throw new UnsupportedOperationException();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index b4d6eb7..8c4e049 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.query.netty.UnknownKvStateID;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.heap.HeapValueState;
@@ -270,7 +270,7 @@ public class QueryableStateClientTest {
 				ValueStateDescriptor<Integer> descriptor =
 						new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
 
-				RegisteredBackendStateMetaInfo<VoidNamespace, Integer> registeredBackendStateMetaInfo = new RegisteredBackendStateMetaInfo<>(
+				RegisteredKeyedBackendStateMetaInfo<VoidNamespace, Integer> registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
 						descriptor.getType(),
 						descriptor.getName(),
 						VoidNamespaceSerializer.INSTANCE,
@@ -279,7 +279,7 @@ public class QueryableStateClientTest {
 				// Register state
 				HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>(
 						descriptor,
-						new NestedMapsStateTable<Integer, VoidNamespace, Integer>(keyedStateBackend, registeredBackendStateMetaInfo),
+						new NestedMapsStateTable<Integer, VoidNamespace, Integer>(keyedStateBackend, registeredKeyedBackendStateMetaInfo),
 						IntSerializer.INSTANCE,
 						VoidNamespaceSerializer.INSTANCE);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index c04ed8c..50ca159 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -97,7 +97,7 @@ public class OperatorStateBackendTest {
 		assertEquals(2, operatorStateBackend.getRegisteredStateNames().size());
 
 		// make sure that type registrations are forwarded
-		TypeSerializer<?> serializer = ((PartitionableListState<?>) listState).getPartitionStateSerializer();
+		TypeSerializer<?> serializer = ((PartitionableListState<?>) listState).getStateMetaInfo().getPartitionStateSerializer();
 		assertTrue(serializer instanceof KryoSerializer);
 		assertTrue(((KryoSerializer<?>) serializer).getKryo().getSerializer(registeredType)
 				instanceof com.esotericsoftware.kryo.serializers.JavaSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 0dbe2eb..02b4d62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -29,10 +30,21 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
 public class SerializationProxiesTest {
 
 	@Test
@@ -42,14 +54,14 @@ public class SerializationProxiesTest {
 		TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
 		TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-		List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoList = new ArrayList<>();
+		List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoList = new ArrayList<>();
 
-		stateMetaInfoList.add(
-				new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer));
-		stateMetaInfoList.add(
-				new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer));
-		stateMetaInfoList.add(
-				new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer));
+		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+			StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer).snapshot());
+		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+			StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer).snapshot());
+		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+			StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
 
 		KeyedBackendSerializationProxy serializationProxy =
 				new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
@@ -67,8 +79,8 @@ public class SerializationProxiesTest {
 			serializationProxy.read(new DataInputViewStreamWrapper(in));
 		}
 
-		Assert.assertEquals(keySerializer, serializationProxy.getKeySerializerProxy().getTypeSerializer());
-		Assert.assertEquals(stateMetaInfoList, serializationProxy.getNamedStateSerializationProxies());
+		Assert.assertEquals(keySerializer, serializationProxy.getKeySerializer());
+		Assert.assertEquals(stateMetaInfoList, serializationProxy.getStateMetaInfoSnapshots());
 	}
 
 	@Test
@@ -78,41 +90,79 @@ public class SerializationProxiesTest {
 		TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
 		TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-		KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfo =
-				new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer);
+		RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+			StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot();
 
 		byte[] serialized;
 		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
-			metaInfo.write(new DataOutputViewStreamWrapper(out));
+			KeyedBackendStateMetaInfoSnapshotReaderWriters
+				.getWriterForVersion(KeyedBackendSerializationProxy.VERSION, metaInfo)
+				.writeStateMetaInfo(new DataOutputViewStreamWrapper(out));
+
 			serialized = out.toByteArray();
 		}
 
-		metaInfo = new KeyedBackendSerializationProxy.StateMetaInfo<>(Thread.currentThread().getContextClassLoader());
-
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
-			metaInfo.read(new DataInputViewStreamWrapper(in));
+			metaInfo = KeyedBackendStateMetaInfoSnapshotReaderWriters
+				.getReaderForVersion(KeyedBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader())
+				.readStateMetaInfo(new DataInputViewStreamWrapper(in));
 		}
 
-		Assert.assertEquals(name, metaInfo.getStateName());
+		Assert.assertEquals(name, metaInfo.getName());
 	}
 
+	@Test
+	public void testKeyedStateMetaInfoReadSerializerFailureResilience() throws Exception {
+		String name = "test";
+		TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
+		TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
+
+		RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+			StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot();
+
+		byte[] serialized;
+		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+			KeyedBackendStateMetaInfoSnapshotReaderWriters
+				.getWriterForVersion(KeyedBackendSerializationProxy.VERSION, metaInfo)
+				.writeStateMetaInfo(new DataOutputViewStreamWrapper(out));
+
+			serialized = out.toByteArray();
+		}
+
+		// mock failure when deserializing serializer
+		TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
+		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+		PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			metaInfo = KeyedBackendStateMetaInfoSnapshotReaderWriters
+				.getReaderForVersion(KeyedBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader())
+				.readStateMetaInfo(new DataInputViewStreamWrapper(in));
+		}
+
+		Assert.assertEquals(name, metaInfo.getName());
+		Assert.assertEquals(null, metaInfo.getNamespaceSerializer());
+		Assert.assertEquals(null, metaInfo.getStateSerializer());
+		Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), metaInfo.getNamespaceSerializerConfigSnapshot());
+		Assert.assertEquals(stateSerializer.snapshotConfiguration(), metaInfo.getStateSerializerConfigSnapshot());
+	}
 
 	@Test
 	public void testOperatorBackendSerializationProxyRoundtrip() throws Exception {
 
 		TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-		List<OperatorBackendSerializationProxy.StateMetaInfo<?>> stateMetaInfoList = new ArrayList<>();
+		List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> stateMetaInfoSnapshots = new ArrayList<>();
 
-		stateMetaInfoList.add(
-				new OperatorBackendSerializationProxy.StateMetaInfo<>("a", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
-		stateMetaInfoList.add(
-				new OperatorBackendSerializationProxy.StateMetaInfo<>("b", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
-		stateMetaInfoList.add(
-				new OperatorBackendSerializationProxy.StateMetaInfo<>("c", stateSerializer, OperatorStateHandle.Mode.BROADCAST));
+		stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>(
+			"a", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE).snapshot());
+		stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>(
+			"b", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE).snapshot());
+		stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>(
+			"c", stateSerializer, OperatorStateHandle.Mode.BROADCAST).snapshot());
 
 		OperatorBackendSerializationProxy serializationProxy =
-				new OperatorBackendSerializationProxy(stateMetaInfoList);
+				new OperatorBackendSerializationProxy(stateMetaInfoSnapshots);
 
 		byte[] serialized;
 		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -127,7 +177,7 @@ public class SerializationProxiesTest {
 			serializationProxy.read(new DataInputViewStreamWrapper(in));
 		}
 
-		Assert.assertEquals(stateMetaInfoList, serializationProxy.getNamedStateSerializationProxies());
+		Assert.assertEquals(stateMetaInfoSnapshots, serializationProxy.getStateMetaInfoSnapshots());
 	}
 
 	@Test
@@ -136,22 +186,60 @@ public class SerializationProxiesTest {
 		String name = "test";
 		TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-		OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
-				new OperatorBackendSerializationProxy.StateMetaInfo<>(name, stateSerializer, OperatorStateHandle.Mode.BROADCAST);
+		RegisteredOperatorBackendStateMetaInfo.Snapshot<?> metaInfo =
+			new RegisteredOperatorBackendStateMetaInfo<>(
+				name, stateSerializer, OperatorStateHandle.Mode.BROADCAST).snapshot();
+
+		byte[] serialized;
+		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+			OperatorBackendStateMetaInfoSnapshotReaderWriters
+				.getWriterForVersion(OperatorBackendSerializationProxy.VERSION, metaInfo)
+				.writeStateMetaInfo(new DataOutputViewStreamWrapper(out));
+
+			serialized = out.toByteArray();
+		}
+
+		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			metaInfo = OperatorBackendStateMetaInfoSnapshotReaderWriters
+				.getReaderForVersion(OperatorBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader())
+				.readStateMetaInfo(new DataInputViewStreamWrapper(in));
+		}
+
+		Assert.assertEquals(name, metaInfo.getName());
+	}
+
+	@Test
+	public void testOperatorStateMetaInfoReadSerializerFailureResilience() throws Exception {
+		String name = "test";
+		TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
+
+		RegisteredOperatorBackendStateMetaInfo.Snapshot<?> metaInfo =
+			new RegisteredOperatorBackendStateMetaInfo<>(
+				name, stateSerializer, OperatorStateHandle.Mode.BROADCAST).snapshot();
 
 		byte[] serialized;
 		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
-			metaInfo.write(new DataOutputViewStreamWrapper(out));
+			OperatorBackendStateMetaInfoSnapshotReaderWriters
+				.getWriterForVersion(OperatorBackendSerializationProxy.VERSION, metaInfo)
+				.writeStateMetaInfo(new DataOutputViewStreamWrapper(out));
+
 			serialized = out.toByteArray();
 		}
 
-		metaInfo = new OperatorBackendSerializationProxy.StateMetaInfo<>(Thread.currentThread().getContextClassLoader());
+		// mock failure when deserializing serializer
+		TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
+		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+		PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
 
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
-			metaInfo.read(new DataInputViewStreamWrapper(in));
+			metaInfo = OperatorBackendStateMetaInfoSnapshotReaderWriters
+				.getReaderForVersion(OperatorBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader())
+				.readStateMetaInfo(new DataInputViewStreamWrapper(in));
 		}
 
 		Assert.assertEquals(name, metaInfo.getName());
+		Assert.assertEquals(null, metaInfo.getPartitionStateSerializer());
+		Assert.assertEquals(stateSerializer.snapshotConfiguration(), metaInfo.getPartitionStateSerializerConfigSnapshot());
 	}
 
 	/**
@@ -171,4 +259,4 @@ public class SerializationProxiesTest {
 		Assert.assertEquals(5, StateDescriptor.Type.AGGREGATING.ordinal());
 		Assert.assertEquals(6, StateDescriptor.Type.MAP.ordinal());
 	}
-}
\ No newline at end of file
+}