You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/07 19:47:05 UTC

[7/8] flink git commit: [FLINK-6178] [core] Allow serializer upgrades for managed state

[FLINK-6178] [core] Allow serializer upgrades for managed state

This commit adds the functionality of allowing serializer upgrades for
Flink's managed state. It consists of 2 major changes: 1) new
user-facing API in `TypeSerializer`, and 2) activate serializer upgrades
in state backends.

For 1) new user-facing API for `TypeSerializer`, the following is added:
- new class: TypeSerializerConfigSnapshot
- new class: CompatibilityResult
- new method: TypeSerializer#snapshotConfiguration()
- new method:
  TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)

Generally speaking, configuration snapshots contains a point-in-time
view of a serializer's state / configuration, and is persisted along
with checkpoints. On restore, the configuration is confronted with the
new serializer of the state to check for compatibility, which may
introduce reconfiguration of the new serializer to be compatible.

This compatibility check is integrated in the state backends' restore
flow in 2). Currently, if the check results in the need to perform state
migration, the restore simply fails as the state migration feature isn't
yet available.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8aa5e057
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8aa5e057
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8aa5e057

Branch: refs/heads/master
Commit: 8aa5e05733655e7b3d1f11ed15f61672d61e5cb5
Parents: 409319a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon May 8 02:04:23 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 8 02:04:23 2017 +0800

----------------------------------------------------------------------
 .../typeutils/runtime/WritableSerializer.java   |  45 +-
 .../state/RocksDBKeyedStateBackend.java         | 203 ++++--
 .../streaming/state/RocksDBStateBackend.java    |   2 +-
 .../common/typeutils/CompatibilityResult.java   |  80 +++
 .../CompositeTypeSerializerConfigSnapshot.java  |  85 +++
 .../GenericTypeSerializerConfigSnapshot.java    |  88 +++
 .../ParameterlessTypeSerializerConfig.java      |  89 +++
 .../api/common/typeutils/TypeSerializer.java    |  53 +-
 .../typeutils/TypeSerializerConfigSnapshot.java | 103 +++
 .../TypeSerializerSerializationProxy.java       |  12 +-
 .../common/typeutils/TypeSerializerUtil.java    | 203 ++++++
 .../typeutils/base/BooleanSerializer.java       |   6 +
 .../typeutils/base/BooleanValueSerializer.java  |   9 +-
 .../common/typeutils/base/ByteSerializer.java   |   7 +-
 .../typeutils/base/ByteValueSerializer.java     |   9 +-
 .../common/typeutils/base/CharSerializer.java   |   7 +-
 .../typeutils/base/CharValueSerializer.java     |   9 +-
 .../CollectionSerializerConfigSnapshot.java     |  44 ++
 .../common/typeutils/base/DateSerializer.java   |   7 +
 .../common/typeutils/base/DoubleSerializer.java |   7 +-
 .../typeutils/base/DoubleValueSerializer.java   |   9 +-
 .../common/typeutils/base/EnumSerializer.java   | 175 ++++-
 .../common/typeutils/base/FloatSerializer.java  |   7 +-
 .../typeutils/base/FloatValueSerializer.java    |   9 +-
 .../typeutils/base/GenericArraySerializer.java  |  36 +-
 .../GenericArraySerializerConfigSnapshot.java   |  95 +++
 .../common/typeutils/base/IntSerializer.java    |   7 +-
 .../typeutils/base/IntValueSerializer.java      |   9 +-
 .../common/typeutils/base/ListSerializer.java   |  30 +-
 .../common/typeutils/base/LongSerializer.java   |   7 +-
 .../typeutils/base/LongValueSerializer.java     |   9 +-
 .../common/typeutils/base/MapSerializer.java    |  39 +-
 .../base/MapSerializerConfigSnapshot.java       |  48 ++
 .../common/typeutils/base/ShortSerializer.java  |   7 +-
 .../typeutils/base/ShortValueSerializer.java    |   9 +-
 .../typeutils/base/SqlDateSerializer.java       |   7 +
 .../typeutils/base/SqlTimeSerializer.java       |  12 +
 .../common/typeutils/base/StringSerializer.java |   6 +
 .../typeutils/base/StringValueSerializer.java   |   9 +-
 .../typeutils/base/TypeSerializerSingleton.java |  32 +
 .../array/BooleanPrimitiveArraySerializer.java  |   3 +-
 .../array/BytePrimitiveArraySerializer.java     |   2 +-
 .../array/CharPrimitiveArraySerializer.java     |   3 +-
 .../array/DoublePrimitiveArraySerializer.java   |   3 +-
 .../array/FloatPrimitiveArraySerializer.java    |   3 +-
 .../base/array/IntPrimitiveArraySerializer.java |   3 +-
 .../array/LongPrimitiveArraySerializer.java     |   3 +-
 .../array/ShortPrimitiveArraySerializer.java    |   3 +-
 .../base/array/StringArraySerializer.java       |   3 +-
 .../java/typeutils/runtime/AvroSerializer.java  | 144 +++-
 .../runtime/CopyableValueSerializer.java        |  42 +-
 .../typeutils/runtime/EitherSerializer.java     |  37 +
 .../runtime/EitherSerializerConfigSnapshot.java |  49 ++
 .../typeutils/runtime/KryoRegistration.java     | 173 +++++
 ...ryoRegistrationSerializerConfigSnapshot.java | 251 +++++++
 .../api/java/typeutils/runtime/KryoUtils.java   |  27 +
 .../java/typeutils/runtime/PojoSerializer.java  | 681 ++++++++++++++++---
 .../java/typeutils/runtime/RowSerializer.java   |  81 ++-
 .../typeutils/runtime/TupleSerializerBase.java  |  42 ++
 .../runtime/TupleSerializerConfigSnapshot.java  |  91 +++
 .../java/typeutils/runtime/ValueSerializer.java |  86 ++-
 .../typeutils/runtime/kryo/KryoSerializer.java  | 218 ++++--
 .../common/typeutils/SerializerTestBase.java    |  53 ++
 .../TypeSerializerConfigSnapshotTest.java       | 147 ++++
 .../typeutils/base/EnumSerializerTest.java      | 140 ++++
 .../array/CharPrimitiveArraySerializerTest.java |   2 -
 .../typeutils/runtime/PojoSerializerTest.java   | 270 +++++++-
 .../kryo/KryoSerializerCompatibilityTest.java   | 136 ++++
 .../api/java/io/CollectionInputFormatTest.java  |  12 +
 .../flink/cep/NonDuplicatingTypeSerializer.java |  16 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  47 +-
 .../AbstractKeyedCEPPatternOperator.java        |  34 +-
 .../valuearray/IntValueArraySerializer.java     |   7 +
 .../valuearray/LongValueArraySerializer.java    |   7 +
 .../valuearray/StringValueArraySerializer.java  |   7 +
 .../table/runtime/types/CRowSerializer.scala    |  53 +-
 .../MigrationNamespaceSerializerProxy.java      |  17 +-
 .../AbstractMigrationRestoreStrategy.java       |   8 +-
 .../runtime/state/ArrayListSerializer.java      |  31 +-
 .../state/DefaultOperatorStateBackend.java      | 130 ++--
 .../flink/runtime/state/HashMapSerializer.java  |  40 +-
 .../flink/runtime/state/JavaSerializer.java     |  19 +-
 .../state/KeyedBackendSerializationProxy.java   | 178 +----
 ...ckendStateMetaInfoSnapshotReaderWriters.java | 257 +++++++
 .../OperatorBackendSerializationProxy.java      | 168 ++---
 ...ckendStateMetaInfoSnapshotReaderWriters.java | 233 +++++++
 .../state/RegisteredBackendStateMetaInfo.java   | 145 ----
 .../RegisteredKeyedBackendStateMetaInfo.java    | 246 +++++++
 .../RegisteredOperatorBackendStateMetaInfo.java | 198 ++++++
 .../flink/runtime/state/StateMigrationUtil.java |  83 +++
 .../runtime/state/VoidNamespaceSerializer.java  |   8 +
 .../state/heap/CopyOnWriteStateTable.java       |  12 +-
 .../state/heap/HeapKeyedStateBackend.java       |  68 +-
 .../state/heap/NestedMapsStateTable.java        |   4 +-
 .../flink/runtime/state/heap/StateTable.java    |  12 +-
 .../state/heap/StateTableByKeyGroupReaders.java |   9 +-
 .../testutils/types/IntListSerializer.java      |  12 +
 .../testutils/types/IntPairSerializer.java      |  12 +
 .../testutils/types/StringPairSerializer.java   |  12 +
 .../runtime/query/QueryableStateClientTest.java |   6 +-
 .../runtime/state/OperatorStateBackendTest.java |   2 +-
 .../runtime/state/SerializationProxiesTest.java | 150 +++-
 .../runtime/state/StateBackendTestBase.java     | 305 ++++++++-
 .../state/heap/CopyOnWriteStateTableTest.java   |  34 +-
 .../StateTableSnapshotCompatibilityTest.java    |   6 +-
 .../testutils/recordutils/RecordSerializer.java |  12 +
 .../api/scala/typeutils/EitherSerializer.scala  |  48 +-
 .../scala/typeutils/EnumValueSerializer.scala   | 116 +++-
 .../api/scala/typeutils/NothingSerializer.scala |  12 +-
 .../api/scala/typeutils/OptionSerializer.scala  |  52 +-
 .../scala/typeutils/TraversableSerializer.scala |  13 +-
 .../api/scala/typeutils/TrySerializer.scala     |  58 +-
 .../MultiplexingStreamRecordSerializer.java     |  51 ++
 .../streamrecord/StreamRecordSerializer.java    |  49 ++
 .../api/datastream/CoGroupedStreams.java        |  12 +
 .../streaming/api/operators/InternalTimer.java  |  12 +
 .../api/windowing/windows/GlobalWindow.java     |  18 +-
 .../api/windowing/windows/TimeWindow.java       |  19 +-
 .../streamrecord/StreamElementSerializer.java   |  53 ++
 .../jar/CheckpointingCustomKvStateProgram.java  |   1 -
 120 files changed, 6429 insertions(+), 1011 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 9036d75..1a02e7b 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -20,7 +20,11 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 
 import com.esotericsoftware.kryo.Kryo;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
@@ -30,7 +34,8 @@ import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import java.io.IOException;
 
-public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
+@Internal
+public final class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 	
 	private static final long serialVersionUID = 1L;
 	
@@ -149,4 +154,42 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 	public boolean canEqual(Object obj) {
 		return obj instanceof WritableSerializer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public WritableSerializerConfigSnapshot<T> snapshotConfiguration() {
+		return new WritableSerializerConfigSnapshot<>(typeClass);
+	}
+
+	@Override
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof WritableSerializerConfigSnapshot
+				&& typeClass.equals(((WritableSerializerConfigSnapshot) configSnapshot).getTypeClass())) {
+
+			return CompatibilityResult.compatible();
+		} else {
+			return CompatibilityResult.requiresMigration(null);
+		}
+	}
+
+	public static final class WritableSerializerConfigSnapshot<T extends Writable>
+			extends GenericTypeSerializerConfigSnapshot<T> {
+
+		private static final int VERSION = 1;
+
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		public WritableSerializerConfigSnapshot() {}
+
+		public WritableSerializerConfigSnapshot(Class<T> writableTypeClass) {
+			super(writableTypeClass);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index b8e60cd..079ea13 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -26,7 +26,9 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
@@ -52,6 +54,7 @@ import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.StateMigrationUtil;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -59,9 +62,9 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -151,7 +154,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * Information about the k/v states as we create them. This is used to retrieve the
 	 * column family that is used for a state and also for sanity checks when restoring.
 	 */
-	private Map<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> kvStateInformation;
+	private Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
+
+	/**
+	 * Map of state names to their corresponding restored state meta info.
+	 *
+	 * TODO this map can be removed when eager-state registration is in place.
+	 * TODO we currently need this cached to check state migration strategies when new serializers are registered.
+	 */
+	private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot> restoredKvStateMetaInfos;
 
 	/** Number of bytes required to prefix the key groups. */
 	private final int keyGroupPrefixBytes;
@@ -229,7 +240,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			// and access it in a synchronized block that locks on #dbDisposeLock.
 			if (db != null) {
 
-				for (Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> column :
+				for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
 						kvStateInformation.values()) {
 					try {
 						column.f0.close();
@@ -568,23 +579,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		private void writeKVStateMetaData() throws IOException {
 
-			List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
+			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots =
 					new ArrayList<>(stateBackend.kvStateInformation.size());
 
 			int kvStateId = 0;
-			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> column :
+			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
 					stateBackend.kvStateInformation.entrySet()) {
 
-				RegisteredBackendStateMetaInfo<?, ?> metaInfo = column.getValue().f1;
-
-				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
-						new KeyedBackendSerializationProxy.StateMetaInfo<>(
-								metaInfo.getStateType(),
-								metaInfo.getName(),
-								metaInfo.getNamespaceSerializer(),
-								metaInfo.getStateSerializer());
-
-				metaInfoList.add(metaInfoProxy);
+				metaInfoSnapshots.add(column.getValue().f1.snapshot());
 
 				//retrieve iterator for this k/v states
 				readOptions = new ReadOptions();
@@ -597,7 +599,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			}
 
 			KeyedBackendSerializationProxy serializationProxy =
-					new KeyedBackendSerializationProxy(stateBackend.getKeySerializer(), metaInfoList);
+					new KeyedBackendSerializationProxy(stateBackend.getKeySerializer(), metaInfoSnapshots);
 
 			serializationProxy.write(outputView);
 		}
@@ -717,7 +719,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		private Map<String, StreamStateHandle> baseSstFiles;
 
-		private final List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>();
+		private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>();
 
 		private FileSystem backupFileSystem;
 		private Path backupPath;
@@ -800,7 +802,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
 
 				KeyedBackendSerializationProxy serializationProxy =
-					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfoSnapshots);
 				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
 
 				serializationProxy.write(out);
@@ -823,18 +825,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
 
 			// save meta data
-			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) {
-
-				RegisteredBackendStateMetaInfo<?, ?> metaInfo = stateMetaInfoEntry.getValue().f1;
-
-				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
-					new KeyedBackendSerializationProxy.StateMetaInfo<>(
-						metaInfo.getStateType(),
-						metaInfo.getName(),
-						metaInfo.getNamespaceSerializer(),
-						metaInfo.getStateSerializer());
-
-				stateMetaInfos.add(metaInfoProxy);
+			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
+					: stateBackend.kvStateInformation.entrySet()) {
+				stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
 			}
 
 			// save state data
@@ -1112,33 +1105,38 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			serializationProxy.read(currentStateHandleInView);
 
-			List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList =
-					serializationProxy.getNamedStateSerializationProxies();
+			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
+					serializationProxy.getStateMetaInfoSnapshots();
 
-			currentStateHandleKVStateColumnFamilies = new ArrayList<>(metaInfoProxyList.size());
+			currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
+			rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size());
 
-			for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy : metaInfoProxyList) {
-				Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> columnFamily =
-						rocksDBKeyedStateBackend.kvStateInformation.get(metaInfoProxy.getStateName());
+			for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) {
 
-				if (null == columnFamily) {
+				if (!rocksDBKeyedStateBackend.kvStateInformation.containsKey(restoredMetaInfo.getName())) {
 					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
-						metaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+						restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
 						rocksDBKeyedStateBackend.columnOptions);
 
-					RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo =
-							new RegisteredBackendStateMetaInfo<>(metaInfoProxy);
+					RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
+							new RegisteredKeyedBackendStateMetaInfo<>(
+								restoredMetaInfo.getStateType(),
+								restoredMetaInfo.getName(),
+								restoredMetaInfo.getNamespaceSerializer(),
+								restoredMetaInfo.getStateSerializer());
 
-					columnFamily = new Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>(
-							rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor),
-							stateMetaInfo);
+					rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
 
-					rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), columnFamily);
+					ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
+
+					rocksDBKeyedStateBackend.kvStateInformation.put(
+						stateMetaInfo.getName(),
+						new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(columnFamily, stateMetaInfo));
+
+					currentStateHandleKVStateColumnFamilies.add(columnFamily);
 				} else {
-					//TODO we could check here for incompatible serializer versions between previous tasks
+					// TODO with eager state registration in place, check here for serializer migration strategies
 				}
-
-				currentStateHandleKVStateColumnFamilies.add(columnFamily.f0);
 			}
 		}
 
@@ -1198,7 +1196,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			this.stateBackend = stateBackend;
 		}
 
-		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> readMetaData(
+		private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData(
 				StreamStateHandle metaStateHandle) throws Exception {
 
 			FSDataInputStream inputStream = null;
@@ -1212,7 +1210,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				DataInputView in = new DataInputViewStreamWrapper(inputStream);
 				serializationProxy.read(in);
 
-				return serializationProxy.getNamedStateSerializationProxies();
+				return serializationProxy.getStateMetaInfoSnapshots();
 			} finally {
 				if (inputStream != null) {
 					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
@@ -1294,18 +1292,21 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				}
 
 				// read meta data
-				List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies =
+				List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots =
 					readMetaData(restoreStateHandle.getMetaStateHandle());
 
 				List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
 
-				for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy : stateMetaInfoProxies) {
+				stateBackend.restoredKvStateMetaInfos = new HashMap<>(stateMetaInfoSnapshots.size());
+
+				for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
 
 					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
-						stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+						stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
 						stateBackend.columnOptions);
 
 					columnFamilyDescriptors.add(columnFamilyDescriptor);
+					stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot);
 				}
 
 				if (hasExtraKeys) {
@@ -1320,23 +1321,27 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						for (int i = 0; i < columnFamilyHandles.size(); ++i) {
 							ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
 							ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
-							KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy = stateMetaInfoProxies.get(i);
+							RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
 
-							Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
-								stateBackend.kvStateInformation.get(stateMetaInfoProxy.getStateName());
+							Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
+								stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
 
 							if (null == registeredStateMetaInfoEntry) {
 
-								RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo =
-									new RegisteredBackendStateMetaInfo<>(stateMetaInfoProxy);
+								RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
+									new RegisteredKeyedBackendStateMetaInfo<>(
+										stateMetaInfoSnapshot.getStateType(),
+										stateMetaInfoSnapshot.getName(),
+										stateMetaInfoSnapshot.getNamespaceSerializer(),
+										stateMetaInfoSnapshot.getStateSerializer());
 
 								registeredStateMetaInfoEntry =
-									new Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>(
+									new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
 										stateBackend.db.createColumnFamily(columnFamilyDescriptor),
 										stateMetaInfo);
 
 								stateBackend.kvStateInformation.put(
-									stateMetaInfoProxy.getStateName(),
+									stateMetaInfoSnapshot.getName(),
 									registeredStateMetaInfoEntry);
 							}
 
@@ -1403,15 +1408,19 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						columnFamilyDescriptors, columnFamilyHandles);
 
 					for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
-						KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy = stateMetaInfoProxies.get(i);
+						RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
 
 						ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
-						RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo =
-							new RegisteredBackendStateMetaInfo<>(stateMetaInfoProxy);
+						RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
+							new RegisteredKeyedBackendStateMetaInfo<>(
+								stateMetaInfoSnapshot.getStateType(),
+								stateMetaInfoSnapshot.getName(),
+								stateMetaInfoSnapshot.getNamespaceSerializer(),
+								stateMetaInfoSnapshot.getStateSerializer());
 
 						stateBackend.kvStateInformation.put(
-							stateMetaInfoProxy.getStateName(),
-							new Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>(
+							stateMetaInfoSnapshot.getName(),
+							new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
 								columnFamilyHandle, stateMetaInfo));
 					}
 
@@ -1473,22 +1482,57 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	protected <N, S> ColumnFamilyHandle getColumnFamily(
 			StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException {
 
-		Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo =
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
 				kvStateInformation.get(descriptor.getName());
 
-		RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>(
-				descriptor.getType(),
-				descriptor.getName(),
-				namespaceSerializer,
-				descriptor.getSerializer());
+		RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+			descriptor.getType(),
+			descriptor.getName(),
+			namespaceSerializer,
+			descriptor.getSerializer());
 
 		if (stateInfo != null) {
-			if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
+			// TODO with eager registration in place, these checks should be moved to restore()
+
+			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
+				restoredKvStateMetaInfos.get(descriptor.getName());
+
+			Preconditions.checkState(
+				newMetaInfo.getName().equals(restoredMetaInfo.getName()),
+				"Incompatible state names. " +
+					"Was [" + restoredMetaInfo.getName() + "], " +
+					"registered with [" + newMetaInfo.getName() + "].");
+
+			if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
+				&& !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
+
+				Preconditions.checkState(
+					newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()),
+					"Incompatible state types. " +
+						"Was [" + restoredMetaInfo.getStateType() + "], " +
+						"registered with [" + newMetaInfo.getStateType() + "].");
+			}
+
+			// check compatibility results to determine if state migration is required
+
+			CompatibilityResult<N> namespaceCompatibility = StateMigrationUtil.resolveCompatibilityResult(
+					restoredMetaInfo.getNamespaceSerializer(),
+					MigrationNamespaceSerializerProxy.class,
+					restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
+					newMetaInfo.getNamespaceSerializer());
+
+			CompatibilityResult<S> stateCompatibility = StateMigrationUtil.resolveCompatibilityResult(
+					restoredMetaInfo.getStateSerializer(),
+					TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+					restoredMetaInfo.getStateSerializerConfigSnapshot(),
+					newMetaInfo.getStateSerializer());
+
+			if (!namespaceCompatibility.requiresMigration() && !stateCompatibility.requiresMigration()) {
 				stateInfo.f1 = newMetaInfo;
 				return stateInfo.f0;
 			} else {
-				throw new IOException("Trying to access state using wrong meta info, was " + stateInfo.f1 +
-						" trying access with " + newMetaInfo);
+				// TODO state migration currently isn't possible.
+				throw new RuntimeException("State migration currently isn't supported.");
 			}
 		}
 
@@ -1497,7 +1541,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		try {
 			ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
-			Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<N, S>> tuple =
+			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
 					new Tuple2<>(columnFamily, newMetaInfo);
 			Map rawAccess = kvStateInformation;
 			rawAccess.put(descriptor.getName(), tuple);
@@ -1832,6 +1876,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		// clear k/v state information before filling it
 		kvStateInformation.clear();
 
+		restoredKvStateMetaInfos = new HashMap<>(namedStates.size());
+
 		// first get the column family mapping
 		int numColumns = inputView.readInt();
 		Map<Byte, StateDescriptor<?, ?>> columnFamilyMapping = new HashMap<>(numColumns);
@@ -1846,6 +1892,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			columnFamilyMapping.put(mappingByte, stateDescriptor);
 
+			// mimic a restored kv state meta info
+			restoredKvStateMetaInfos.put(
+				stateDescriptor.getName(),
+				new RegisteredKeyedBackendStateMetaInfo<>(
+					stateDescriptor.getType(),
+					stateDescriptor.getName(),
+					MigrationNamespaceSerializerProxy.INSTANCE,
+					stateDescriptor.getSerializer()).snapshot());
+
 			// this will fill in the k/v state information
 			getColumnFamily(stateDescriptor, MigrationNamespaceSerializerProxy.INSTANCE);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
index e5a78b6..695aa12 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
@@ -75,7 +75,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		}
 
 		private static void throwExceptionOnLoadingThisClass() {
-			throw new RuntimeException("Attempt to migrate RocksDB state created with semi async snapshot mode failed. "
+			throw new RuntimeException("Attempt to requiresMigration RocksDB state created with semi async snapshot mode failed. "
 					+ "Unfortunately, this is not supported. Please create a new savepoint for the job using fully "
 					+ "async mode in Flink 1.1 and run migration again with the new savepoint.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
new file mode 100644
index 0000000..cfbb516
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A {@code CompatibilityResult} contains information about whether or not data migration
+ * is required in order to continue using new serializers for previously serialized data.
+ *
+ * @param <T> the type of the data being migrated.
+ */
+@PublicEvolving
+public final class CompatibilityResult<T> {
+
+	/** Whether or not migration is required. */
+	private final boolean requiresMigration;
+
+	/**
+	 * The convert deserializer to use for reading previous data during migration,
+	 * in the case that the preceding serializer cannot be found.
+	 *
+	 * <p>This is only relevant if migration is required.
+	 */
+	private final TypeSerializer<T> convertDeserializer;
+
+	/**
+	 * Returns a strategy that signals that the new serializer is compatible and no migration is required.
+	 *
+	 * @return a result that signals migration is not required for the new serializer
+	 */
+	public static <T> CompatibilityResult<T> compatible() {
+		return new CompatibilityResult<>(false, null);
+	}
+
+	/**
+	 * Returns a strategy that signals migration to be performed.
+	 *
+	 * <p>Furthermore, in the case that the preceding serializer cannot be found or restored to read the
+	 * previous data during migration, a provided convert deserializer can be used (may be {@code null}
+	 * if one cannot be provided).
+	 *
+	 * <p>In the case that the preceding serializer cannot be found and a convert deserializer is not
+	 * provided, the migration will fail due to the incapability of reading previous data.
+	 *
+	 * @return a result that signals migration is necessary, possibly providing a convert deserializer.
+	 */
+	public static <T> CompatibilityResult<T> requiresMigration(TypeSerializer<T> convertDeserializer) {
+		return new CompatibilityResult<>(true, convertDeserializer);
+	}
+
+	private CompatibilityResult(boolean requiresMigration, TypeSerializer<T> convertDeserializer) {
+		this.requiresMigration = requiresMigration;
+		this.convertDeserializer = convertDeserializer;
+	}
+
+	public TypeSerializer<T> getConvertDeserializer() {
+		return convertDeserializer;
+	}
+
+	public boolean requiresMigration() {
+		return requiresMigration;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
new file mode 100644
index 0000000..e7e2650
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for serializers that has multiple nested serializers.
+ * The configuration snapshot consists of the configuration snapshots of all nested serializers.
+ */
+@Internal
+public abstract class CompositeTypeSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
+
+	private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public CompositeTypeSerializerConfigSnapshot() {}
+
+	public CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot... nestedSerializerConfigSnapshots) {
+		this.nestedSerializerConfigSnapshots = Preconditions.checkNotNull(nestedSerializerConfigSnapshots);
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		super.write(out);
+		TypeSerializerUtil.writeSerializerConfigSnapshots(out, nestedSerializerConfigSnapshots);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		super.read(in);
+		nestedSerializerConfigSnapshots = TypeSerializerUtil.readSerializerConfigSnapshots(in, getUserCodeClassLoader());
+	}
+
+	public TypeSerializerConfigSnapshot[] getNestedSerializerConfigSnapshots() {
+		return nestedSerializerConfigSnapshots;
+	}
+
+	public TypeSerializerConfigSnapshot getSingleNestedSerializerConfigSnapshot() {
+		return nestedSerializerConfigSnapshots[0];
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+
+		if (obj == null) {
+			return false;
+		}
+
+		return (obj.getClass().equals(getClass()))
+				&& Arrays.equals(
+					nestedSerializerConfigSnapshots,
+					((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots());
+	}
+
+	@Override
+	public int hashCode() {
+		return Arrays.hashCode(nestedSerializerConfigSnapshots);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerConfigSnapshot.java
new file mode 100644
index 0000000..4edfe12
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerConfigSnapshot.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Configuration snapshot for serializers for generic types.
+ *
+ * @param <T> The type to be instantiated.
+ */
+@Internal
+public abstract class GenericTypeSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot {
+
+	private Class<T> typeClass;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public GenericTypeSerializerConfigSnapshot() {}
+
+	public GenericTypeSerializerConfigSnapshot(Class<T> typeClass) {
+		this.typeClass = Preconditions.checkNotNull(typeClass);
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		super.write(out);
+
+		// write only the classname to avoid Java serialization
+		out.writeUTF(typeClass.getName());
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void read(DataInputView in) throws IOException {
+		super.read(in);
+
+		String genericTypeClassname = in.readUTF();
+		try {
+			typeClass = (Class<T>) Class.forName(genericTypeClassname, true, getUserCodeClassLoader());
+		} catch (ClassNotFoundException e) {
+			throw new IOException("Could not find the requested class " + genericTypeClassname + " in classpath.", e);
+		}
+	}
+
+	public Class<T> getTypeClass() {
+		return typeClass;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+
+		if (obj == null) {
+			return false;
+		}
+
+		return (obj.getClass().equals(getClass()))
+				&& typeClass.equals(((GenericTypeSerializerConfigSnapshot) obj).getTypeClass());
+	}
+
+	@Override
+	public int hashCode() {
+		return typeClass.hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
new file mode 100644
index 0000000..7ba7dd4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A base class for {@link TypeSerializerConfigSnapshot}s that do not have any parameters.
+ */
+@Internal
+public final class ParameterlessTypeSerializerConfig extends TypeSerializerConfigSnapshot {
+
+	private static final int VERSION = 1;
+
+	/**
+	 * A string identifier that encodes the serialization format used by the serializer.
+	 *
+	 * TODO we might change this to a proper serialization format class in the future
+	 */
+	private String serializationFormatIdentifier;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public ParameterlessTypeSerializerConfig() {}
+
+	public ParameterlessTypeSerializerConfig(String serializationFormatIdentifier) {
+		this.serializationFormatIdentifier = Preconditions.checkNotNull(serializationFormatIdentifier);
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		super.write(out);
+		out.writeUTF(serializationFormatIdentifier);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		super.read(in);
+		serializationFormatIdentifier = in.readUTF();
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+	public String getSerializationFormatIdentifier() {
+		return serializationFormatIdentifier;
+	}
+
+	@Override
+	public boolean equals(Object other) {
+		if (other == this) {
+			return true;
+		}
+
+		if (other == null) {
+			return false;
+		}
+
+		return (other instanceof ParameterlessTypeSerializerConfig)
+				&& serializationFormatIdentifier.equals(((ParameterlessTypeSerializerConfig) other).getSerializationFormatIdentifier());
+	}
+
+	@Override
+	public int hashCode() {
+		return serializationFormatIdentifier.hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 6edaec6..f0562d4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -161,7 +161,54 @@ public abstract class TypeSerializer<T> implements Serializable {
 
 	public abstract int hashCode();
 
-	public boolean canRestoreFrom(TypeSerializer<?> other) {
-		return equals(other);
-	}
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Create a snapshot of the serializer's current configuration to be stored along with the managed state it is
+	 * registered to (if any - this method is only relevant if this serializer is registered for serialization of
+	 * managed state).
+	 *
+	 * <p>The configuration snapshot should contain information about the serializer's parameter settings and its
+	 * serialization format. When a new serializer is registered to serialize the same managed state that this
+	 * serializer was registered to, the returned configuration snapshot can be used to ensure compatibility
+	 * of the new serializer and determine if state migration is required.
+	 *
+	 * @see TypeSerializerConfigSnapshot
+	 *
+	 * @return snapshot of the serializer's current configuration (cannot be {@code null}).
+	 */
+	public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
+
+	/**
+	 * Ensure compatibility of this serializer with a preceding serializer that was registered for serialization of
+	 * the same managed state (if any - this method is only relevant if this serializer is registered for
+	 * serialization of managed state).
+	 *
+	 * The compatibility check in this method should be performed by inspecting the preceding serializer's configuration
+	 * snapshot. The method may reconfigure the serializer (if required and possible) so that it may be compatible,
+	 * or provide a signaling result that informs Flink that state migration is necessary before continuing to use
+	 * this serializer.
+	 *
+	 * <p>The result can be one of the following:
+	 * <ul>
+	 *     <li>{@link CompatibilityResult#compatible()}: this signals Flink that this serializer is compatible, or
+	 *     has been reconfigured to be compatible, to continue reading previous data, and that the
+	 *     serialization schema remains the same. No migration needs to be performed.</li>
+	 *
+	 *     <li>{@link CompatibilityResult#requiresMigration(TypeSerializer)}: this signals Flink that
+	 *     migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be
+	 *     compatible, for previous data. Furthermore, in the case that the preceding serializer cannot be found or
+	 *     restored to read the previous data to perform the migration, the provided convert deserializer can be
+	 *     used (may be {@code null} if one cannot be provided).</li>
+	 * </ul>
+	 *
+	 * @see CompatibilityResult
+	 *
+	 * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state
+	 *
+	 * @return the determined compatibility result.
+	 */
+	public abstract CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
new file mode 100644
index 0000000..27369b9
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.VersionMismatchException;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link TypeSerializer's} configuration.
+ * The configuration snapshot of a serializer is persisted along with checkpoints of the managed state that the
+ * serializer is registered to.
+ *
+ * <p>The persisted configuration may later on be used by new serializers to ensure serialization compatibility
+ * for the same managed state. In order for new serializers to be able to ensure this, the configuration snapshot
+ * should encode sufficient information about:
+ *
+ * <ul>
+ *   <li><strong>Parameter settings of the serializer:</strong> parameters of the serializer include settings
+ *   required to setup the serializer, or the state of the serializer if it is stateful. If the serializer
+ *   has nested serializers, then the configuration snapshot should also contain the parameters of the nested
+ *   serializers.</li>
+ *
+ *   <li><strong>Serialization schema of the serializer:</strong> the data format used by the serializer.</li>
+ * </ul>
+ *
+ * <p>NOTE: Implementations must contain the default empty nullary constructor. This is required to be able to
+ * deserialize the configuration snapshot from its binary form.
+ */
+@PublicEvolving
+public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable {
+
+	/** The user code class loader; only relevant if this configuration instance was deserialized from binary form. */
+	private ClassLoader userCodeClassLoader;
+
+	/** The snapshot version of this configuration. */
+	private Integer snapshotVersion;
+
+	/**
+	 * Returns the version of the configuration at the time its snapshot was taken.
+	 *
+	 * @return the snapshot configuration's version.
+	 */
+	public int getSnapshotVersion() {
+		if (snapshotVersion == null) {
+			return getVersion();
+		} else {
+			return snapshotVersion;
+		}
+	}
+
+	/**
+	 * Set the user code class loader.
+	 * Only relevant if this configuration instance was deserialized from binary form.
+	 *
+	 * <p>This method is not part of the public user-facing API, and cannot be overriden.
+	 *
+	 * @param userCodeClassLoader user code class loader.
+	 */
+	@Internal
+	public final void setUserCodeClassLoader(ClassLoader userCodeClassLoader) {
+		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+	}
+
+	/**
+	 * Returns the user code class loader.
+	 * Only relevant if this configuration instance was deserialized from binary form.
+	 *
+	 * @return the user code class loader
+	 */
+	@Internal
+	public final ClassLoader getUserCodeClassLoader() {
+		return userCodeClassLoader;
+	}
+
+	@Override
+	protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
+		super.resolveVersionRead(foundVersion);
+		this.snapshotVersion = foundVersion;
+	}
+
+	public abstract boolean equals(Object obj);
+
+	public abstract int hashCode();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
index c94124f..067a1ca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
@@ -143,7 +143,7 @@ public class TypeSerializerSerializationProxy<T> extends VersionedIOReadableWrit
 	 * Dummy TypeSerializer to avoid that data is lost when checkpointing again a serializer for which we encountered
 	 * a {@link ClassNotFoundException}.
 	 */
-	static final class ClassNotFoundDummyTypeSerializer<T> extends TypeSerializer<T> {
+	public static final class ClassNotFoundDummyTypeSerializer<T> extends TypeSerializer<T> {
 
 		private static final long serialVersionUID = 2526330533671642711L;
 		private final byte[] actualBytes;
@@ -207,6 +207,16 @@ public class TypeSerializerSerializationProxy<T> extends VersionedIOReadableWrit
 		}
 
 		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+		}
+
+		@Override
+		public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+		}
+
+		@Override
 		public boolean canEqual(Object obj) {
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
new file mode 100644
index 0000000..0a2148a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Utility methods for {@link TypeSerializer} and {@link TypeSerializerConfigSnapshot}.
+ */
+@Internal
+public class TypeSerializerUtil {
+
+	/**
+	 * Creates an array of {@link TypeSerializerConfigSnapshot}s taken
+	 * from the provided array of {@link TypeSerializer}s.
+	 *
+	 * @param serializers array of type serializers.
+	 *
+	 * @return array of configuration snapshots taken from each serializer.
+	 */
+	public static TypeSerializerConfigSnapshot[] snapshotConfigurations(TypeSerializer<?>[] serializers) {
+		final TypeSerializerConfigSnapshot[] configSnapshots = new TypeSerializerConfigSnapshot[serializers.length];
+
+		for (int i = 0; i < serializers.length; i++) {
+			configSnapshots[i] = serializers[i].snapshotConfiguration();
+		}
+
+		return configSnapshots;
+	}
+
+	/**
+	 * Writes a {@link TypeSerializerConfigSnapshot} to the provided data output view.
+	 *
+	 * <p>It is written with a format that can be later read again using
+	 * {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}.
+	 *
+	 * @param out the data output view
+	 * @param serializerConfigSnapshot the serializer configuration snapshot to write
+	 *
+	 * @throws IOException
+	 */
+	public static void writeSerializerConfigSnapshot(
+			DataOutputView out,
+			TypeSerializerConfigSnapshot serializerConfigSnapshot) throws IOException {
+
+		new TypeSerializerConfigSnapshotProxy(serializerConfigSnapshot).write(out);
+	}
+
+	/**
+	 * Reads from a data input view a {@link TypeSerializerConfigSnapshot} that was previously
+	 * written using {@link #writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot)}.
+	 *
+	 * @param in the data input view
+	 * @param userCodeClassLoader the user code class loader to use
+	 *
+	 * @return the read serializer configuration snapshot
+	 *
+	 * @throws IOException
+	 */
+	public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
+			DataInputView in,
+			ClassLoader userCodeClassLoader) throws IOException {
+
+		final TypeSerializerConfigSnapshotProxy proxy = new TypeSerializerConfigSnapshotProxy(userCodeClassLoader);
+		proxy.read(in);
+
+		return proxy.getSerializerConfigSnapshot();
+	}
+
+	/**
+	 * Writes multiple {@link TypeSerializerConfigSnapshot}s to the provided data output view.
+	 *
+	 * <p>It is written with a format that can be later read again using
+	 * {@link #readSerializerConfigSnapshots(DataInputView, ClassLoader)}.
+	 *
+	 * @param out the data output view
+	 * @param serializerConfigSnapshots the serializer configuration snapshots to write
+	 *
+	 * @throws IOException
+	 */
+	public static void writeSerializerConfigSnapshots(
+			DataOutputView out,
+			TypeSerializerConfigSnapshot... serializerConfigSnapshots) throws IOException {
+
+		out.writeInt(serializerConfigSnapshots.length);
+
+		for (TypeSerializerConfigSnapshot snapshot : serializerConfigSnapshots) {
+			new TypeSerializerConfigSnapshotProxy(snapshot).write(out);
+		}
+	}
+
+	/**
+	 * Reads from a data input view multiple {@link TypeSerializerConfigSnapshot}s that was previously
+	 * written using {@link #writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot)}.
+	 *
+	 * @param in the data input view
+	 * @param userCodeClassLoader the user code class loader to use
+	 *
+	 * @return the read serializer configuration snapshots
+	 *
+	 * @throws IOException
+	 */
+	public static TypeSerializerConfigSnapshot[] readSerializerConfigSnapshots(
+			DataInputView in,
+			ClassLoader userCodeClassLoader) throws IOException {
+
+		int numFields = in.readInt();
+		final TypeSerializerConfigSnapshot[] serializerConfigSnapshots = new TypeSerializerConfigSnapshot[numFields];
+
+		TypeSerializerConfigSnapshotProxy proxy;
+		for (int i = 0; i < numFields; i++) {
+			proxy = new TypeSerializerConfigSnapshotProxy(userCodeClassLoader);
+			proxy.read(in);
+			serializerConfigSnapshots[i] = proxy.getSerializerConfigSnapshot();
+		}
+
+		return serializerConfigSnapshots;
+	}
+
+	/**
+	 * Utility serialization proxy for a {@link TypeSerializerConfigSnapshot}.
+	 */
+	static class TypeSerializerConfigSnapshotProxy extends VersionedIOReadableWritable {
+
+		private static final int VERSION = 1;
+
+		private ClassLoader userCodeClassLoader;
+		private TypeSerializerConfigSnapshot serializerConfigSnapshot;
+
+		TypeSerializerConfigSnapshotProxy(ClassLoader userCodeClassLoader) {
+			this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+		}
+
+		TypeSerializerConfigSnapshotProxy(TypeSerializerConfigSnapshot serializerConfigSnapshot) {
+			this.serializerConfigSnapshot = serializerConfigSnapshot;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+
+			// config snapshot class, so that we can re-instantiate the
+			// correct type of config snapshot instance when deserializing
+			out.writeUTF(serializerConfigSnapshot.getClass().getName());
+
+			// the actual configuration parameters
+			serializerConfigSnapshot.write(out);
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public void read(DataInputView in) throws IOException {
+			super.read(in);
+
+			String serializerConfigClassname = in.readUTF();
+			Class<? extends TypeSerializerConfigSnapshot> serializerConfigSnapshotClass;
+			try {
+				serializerConfigSnapshotClass = (Class<? extends TypeSerializerConfigSnapshot>)
+					Class.forName(serializerConfigClassname, true, userCodeClassLoader);
+			} catch (ClassNotFoundException e) {
+				throw new IOException(
+					"Could not find requested TypeSerializerConfigSnapshot class "
+						+ serializerConfigClassname +  " in classpath.", e);
+			}
+
+			serializerConfigSnapshot = InstantiationUtil.instantiate(serializerConfigSnapshotClass);
+			serializerConfigSnapshot.setUserCodeClassLoader(userCodeClassLoader);
+			serializerConfigSnapshot.read(in);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		TypeSerializerConfigSnapshot getSerializerConfigSnapshot() {
+			return serializerConfigSnapshot;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
index 609f184..f275807 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
@@ -82,4 +82,10 @@ public final class BooleanSerializer extends TypeSerializerSingleton<Boolean> {
 	public boolean canEqual(Object obj) {
 		return obj instanceof BooleanSerializer;
 	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+			|| identifier.equals(BooleanValueSerializer.class.getCanonicalName());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
index 62c91df..4755549 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
@@ -31,8 +31,7 @@ public final class BooleanValueSerializer extends TypeSerializerSingleton<Boolea
 	private static final long serialVersionUID = 1L;
 	
 	public static final BooleanValueSerializer INSTANCE = new BooleanValueSerializer();
-	
-	
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
@@ -86,4 +85,10 @@ public final class BooleanValueSerializer extends TypeSerializerSingleton<Boolea
 	public boolean canEqual(Object obj) {
 		return obj instanceof BooleanValueSerializer;
 	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+			|| identifier.equals(BooleanSerializer.class.getCanonicalName());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
index 6ad7e4e..bf2baf5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
@@ -33,7 +33,6 @@ public final class ByteSerializer extends TypeSerializerSingleton<Byte> {
 	
 	private static final Byte ZERO = Byte.valueOf((byte) 0);
 
-
 	@Override
 	public boolean isImmutableType() {
 		return true;
@@ -83,4 +82,10 @@ public final class ByteSerializer extends TypeSerializerSingleton<Byte> {
 	public boolean canEqual(Object obj) {
 		return obj instanceof ByteSerializer;
 	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+			|| identifier.equals(ByteValueSerializer.class.getCanonicalName());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
index 848b01e..2547dda 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
@@ -31,8 +31,7 @@ public final class ByteValueSerializer extends TypeSerializerSingleton<ByteValue
 	private static final long serialVersionUID = 1L;
 	
 	public static final ByteValueSerializer INSTANCE = new ByteValueSerializer();
-	
-	
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
@@ -84,4 +83,10 @@ public final class ByteValueSerializer extends TypeSerializerSingleton<ByteValue
 	public boolean canEqual(Object obj) {
 		return obj instanceof ByteValueSerializer;
 	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+				|| identifier.equals(ByteSerializer.class.getCanonicalName());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
index 21a32f1..dda3543 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
@@ -33,7 +33,6 @@ public final class CharSerializer extends TypeSerializerSingleton<Character> {
 	
 	private static final Character ZERO = Character.valueOf((char)0);
 
-
 	@Override
 	public boolean isImmutableType() {
 		return true;
@@ -83,4 +82,10 @@ public final class CharSerializer extends TypeSerializerSingleton<Character> {
 	public boolean canEqual(Object obj) {
 		return obj instanceof CharSerializer;
 	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+			|| identifier.equals(CharValueSerializer.class.getCanonicalName());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
index 84dc39a..e012b8c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
@@ -31,8 +31,7 @@ public class CharValueSerializer extends TypeSerializerSingleton<CharValue> {
 	private static final long serialVersionUID = 1L;
 	
 	public static final CharValueSerializer INSTANCE = new CharValueSerializer();
-	
-	
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
@@ -84,4 +83,10 @@ public class CharValueSerializer extends TypeSerializerSingleton<CharValue> {
 	public boolean canEqual(Object obj) {
 		return obj instanceof CharValueSerializer;
 	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+			|| identifier.equals(CharSerializer.class.getCanonicalName());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
new file mode 100644
index 0000000..8fa2315
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * Configuration snapshot of a serializer for collection types.
+ */
+@Internal
+public final class CollectionSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public CollectionSerializerConfigSnapshot() {}
+
+	public CollectionSerializerConfigSnapshot(TypeSerializerConfigSnapshot elementSerializerConfigSnapshot) {
+		super(elementSerializerConfigSnapshot);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
index 3f27de2..28ed904 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
@@ -101,4 +101,11 @@ public final class DateSerializer extends TypeSerializerSingleton<Date> {
 	public boolean canEqual(Object obj) {
 		return obj instanceof DateSerializer;
 	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+			|| identifier.equals(SqlDateSerializer.class.getCanonicalName())
+			|| identifier.equals(SqlTimeSerializer.class.getCanonicalName());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
index 375cb9c..92fe71d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
@@ -33,7 +33,6 @@ public final class DoubleSerializer extends TypeSerializerSingleton<Double> {
 	
 	private static final Double ZERO = Double.valueOf(0);
 
-	
 	@Override
 	public boolean isImmutableType() {
 		return true;
@@ -83,4 +82,10 @@ public final class DoubleSerializer extends TypeSerializerSingleton<Double> {
 	public boolean canEqual(Object obj) {
 		return obj instanceof DoubleSerializer;
 	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+			|| identifier.equals(DoubleValueSerializer.class.getCanonicalName());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
index 232ad6b..9e7e8d0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
@@ -31,8 +31,7 @@ public final class DoubleValueSerializer extends TypeSerializerSingleton<DoubleV
 	private static final long serialVersionUID = 1L;
 	
 	public static final DoubleValueSerializer INSTANCE = new DoubleValueSerializer();
-	
-	
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
@@ -84,4 +83,10 @@ public final class DoubleValueSerializer extends TypeSerializerSingleton<DoubleV
 	public boolean canEqual(Object obj) {
 		return obj instanceof DoubleValueSerializer;
 	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+			|| identifier.equals(DoubleSerializer.class.getCanonicalName());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
index c45487f..2f74d84 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -20,11 +20,24 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -36,7 +49,21 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
 
 	private final Class<T> enumClass;
 
-	private transient T[] values;
+	/**
+	 * Maintain our own map of enum value to their ordinal, instead of directly using {@link Enum#ordinal}.
+	 * This allows us to maintain backwards compatibility for previous serialized data in the case that the
+	 * order of enum constants was changed or new constants were added.
+	 *
+	 * <p>On a fresh start with no reconfiguration, the ordinals would simply be identical to the enum
+	 * constants actual ordinals. Ordinals may change after reconfiguration.
+	 */
+	private Map<T, Integer> valueToOrdinal;
+
+	/**
+	 * Array of enum constants with their indexes identical to their ordinals in the {@link #valueToOrdinal} map.
+	 * Serves as a bidirectional map to have fast access from ordinal to value. May be reordered after reconfiguration.
+	 */
+	private T[] values;
 
 	public EnumSerializer(Class<T> enumClass) {
 		this.enumClass = checkNotNull(enumClass);
@@ -44,6 +71,12 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
 
 		this.values = enumClass.getEnumConstants();
 		checkArgument(this.values.length > 0, "cannot use an empty enum");
+
+		this.valueToOrdinal = new HashMap<>(values.length);
+		int i = 0;
+		for (T value : values) {
+			this.valueToOrdinal.put(value, i++);
+		}
 	}
 
 	@Override
@@ -78,7 +111,8 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
 
 	@Override
 	public void serialize(T record, DataOutputView target) throws IOException {
-		target.writeInt(record.ordinal());
+		// use our own maintained ordinals instead of the actual enum ordinal
+		target.writeInt(valueToOrdinal.get(record));
 	}
 
 	@Override
@@ -121,6 +155,141 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
 
 	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
 		in.defaultReadObject();
-		this.values = enumClass.getEnumConstants();
+
+		// may be null if this serializer was deserialized from an older version
+		if (this.values == null) {
+			this.values = enumClass.getEnumConstants();
+
+			this.valueToOrdinal = new HashMap<>(values.length);
+			int i = 0;
+			for (T value : values) {
+				this.valueToOrdinal.put(value, i++);
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public EnumSerializerConfigSnapshot<T> snapshotConfiguration() {
+		return new EnumSerializerConfigSnapshot<>(enumClass, values);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof EnumSerializerConfigSnapshot) {
+			final EnumSerializerConfigSnapshot<T> config = (EnumSerializerConfigSnapshot<T>) configSnapshot;
+
+			if (enumClass.equals(config.getTypeClass())) {
+
+				// reorder enum constants so that previously existing constants
+				// remain in the same order, and new
+				LinkedHashSet<T> reorderedEnumConstants = new LinkedHashSet<>();
+				reorderedEnumConstants.addAll(Arrays.asList(config.getEnumConstants()));
+				reorderedEnumConstants.addAll(Arrays.asList(enumClass.getEnumConstants()));
+
+				// regenerate enum constant to ordinal bidirectional map
+				this.values = (T[]) Array.newInstance(enumClass, reorderedEnumConstants.size());
+				this.valueToOrdinal.clear();
+				int i = 0;
+				for (T constant : reorderedEnumConstants) {
+					this.values[i] = constant;
+					this.valueToOrdinal.put(constant, i);
+					i++;
+				}
+
+				return CompatibilityResult.compatible();
+			}
+		}
+
+		return CompatibilityResult.requiresMigration(null);
+	}
+
+	/**
+	 * Configuration snapshot of a serializer for enumerations.
+	 *
+	 * Configuration contains the enum class, and an array of the enum's constants
+	 * that existed when the configuration snapshot was taken.
+	 *
+	 * @param <T> the enum type.
+	 */
+	public static final class EnumSerializerConfigSnapshot<T extends Enum<T>>
+			extends GenericTypeSerializerConfigSnapshot<T> {
+
+		private static final int VERSION = 1;
+
+		private T[] enumConstants;
+
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		public EnumSerializerConfigSnapshot() {}
+
+		public EnumSerializerConfigSnapshot(Class<T> enumClass, T[] enumConstants) {
+			super(enumClass);
+			this.enumConstants = Preconditions.checkNotNull(enumConstants);
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+
+			try (final DataOutputViewStream outViewWrapper = new DataOutputViewStream(out)) {
+				InstantiationUtil.serializeObject(outViewWrapper, enumConstants);
+			}
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			super.read(in);
+
+			try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) {
+				try {
+					enumConstants = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
+				} catch (ClassNotFoundException e) {
+					throw new IOException("The requested enum class cannot be found in classpath.", e);
+				} catch (IllegalArgumentException e) {
+					throw new IOException("A previously existing enum constant of "
+						+ getTypeClass().getName() + " no longer exists.", e);
+				}
+			}
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		public T[] getEnumConstants() {
+			return enumConstants;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return super.equals(obj)
+					&& Arrays.equals(
+						enumConstants,
+						((EnumSerializerConfigSnapshot) obj).getEnumConstants());
+		}
+
+		@Override
+		public int hashCode() {
+			return super.hashCode() * 31 + Arrays.hashCode(enumConstants);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Test utilities
+	// --------------------------------------------------------------------------------------------
+
+	@VisibleForTesting
+	T[] getValues() {
+		return values;
+	}
+
+	@VisibleForTesting
+	Map<T, Integer> getValueToOrdinal() {
+		return valueToOrdinal;
 	}
 }