You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/31 06:00:01 UTC

[GitHub] tzulitai closed pull request #6930: [FLINK-10679] Remove deprecated CompatibilityResult and related classes from framework code

tzulitai closed pull request #6930: [FLINK-10679] Remove deprecated CompatibilityResult and related classes from framework code
URL: https://github.com/apache/flink/pull/6930
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java
index 49d03db9830..4b72902264f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java
@@ -24,6 +24,7 @@
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+
 import java.io.IOException;
 
 /**
@@ -48,13 +49,13 @@ public BackwardsCompatibleSerializerSnapshot(TypeSerializer<T> serializerInstanc
 	}
 
 	@Override
-	public void write(DataOutputView out) throws IOException {
+	public void writeSnapshot(DataOutputView out) throws IOException {
 		throw new UnsupportedOperationException(
 			"This is a dummy config snapshot used only for backwards compatibility.");
 	}
 
 	@Override
-	public void read(int version, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+	public void readSnapshot(int version, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
 		throw new UnsupportedOperationException(
 			"This is a dummy config snapshot used only for backwards compatibility.");
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
index a188a4d23db..ae19d038c39 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
@@ -68,7 +68,9 @@
 		if (precedingSerializerConfigSnapshot != null
 			&& !(precedingSerializerConfigSnapshot instanceof BackwardsCompatibleSerializerSnapshot)) {
 
-			CompatibilityResult<T> initialResult = newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot);
+			CompatibilityResult<T> initialResult = resolveCompatibilityResult(
+					(TypeSerializerSnapshot<T>) precedingSerializerConfigSnapshot,
+					newSerializer);
 
 			if (!initialResult.isRequiresMigration()) {
 				return initialResult;
@@ -89,4 +91,19 @@
 		}
 	}
 
+	public static <T> CompatibilityResult<T> resolveCompatibilityResult(
+			TypeSerializerSnapshot<T> precedingSerializerConfigSnapshot,
+			TypeSerializer<T> newSerializer) {
+
+		TypeSerializerSchemaCompatibility<T, TypeSerializer<T>> compatibility =
+				precedingSerializerConfigSnapshot.resolveSchemaCompatibility(newSerializer);
+
+		// everything except "compatible" maps to "requires migration".
+		// at the entry point of the new-to-old-bridge (in the TypeSerializerConfigSnapshot), we
+		// interpret "requiresMigration" as 'incompatible'. That is a precaution because
+		// serializers could previously not specify the 'incompatible' case.
+		return compatibility.isCompatibleAsIs() ?
+				CompatibilityResult.compatible() :
+				CompatibilityResult.requiresMigration();
+	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 7a1675eafba..ddb0b87e525 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -29,11 +28,37 @@
 /**
  * This interface describes the methods that are required for a data type to be handled by the Flink
  * runtime. Specifically, this interface contains the serialization and copying methods.
- * <p>
- * The methods in this class are assumed to be stateless, such that it is effectively thread safe. Stateful
+ *
+ * <p>The methods in this class are assumed to be stateless, such that it is effectively thread safe. Stateful
  * implementations of the methods may lead to unpredictable side effects and will compromise both stability and
  * correctness of the program.
- * 
+ *
+ * <p><b>Upgrading TypeSerializers to the new TypeSerializerSnapshot model</b>
+ *
+ * <p>This section is relevant if you implemented a TypeSerializer in Flink versions up to 1.6 and want
+ * to adapt that implementation to the new interfaces that support proper state schema evolution. Please
+ * follow these steps:
+ *
+ * <ul>
+ *     <li>Change the type serializer's config snapshot to implement {@link TypeSerializerSnapshot}, rather
+ *     than extending {@code TypeSerializerConfigSnapshot} (as previously).
+ *     <li>Move the compatibility check from the {@link TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)}
+ *     method to the {@link TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)} method.
+ * </ul>
+ *
+ * <p><b>Maintaining Backwards Compatibility</b>
+ *
+ * <p>If you want your serializer to be able to restore checkpoints from Flink 1.6 and before, add the steps
+ * below in addition to the steps above.
+ *
+ * <ul>
+ *     <li>Retain the old serializer snapshot class (extending {@code TypeSerializerConfigSnapshot}) under
+ *     the same name and give the updated serializer snapshot class (the one extending {@code TypeSerializerSnapshot})
+ *     a new name.
+ *     <li>Keep the {@link TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)} on the TypeSerializer
+ *     as well.
+ * </ul>
+ *
  * @param <T> The data type that the serializer serializes.
  */
 @PublicEvolving
@@ -163,85 +188,55 @@
 	public abstract int hashCode();
 
 	// --------------------------------------------------------------------------------------------
-	// Serializer configuration snapshotting & compatibility
+	// Serializer configuration snapshot for checkpoints/savepoints
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Create a snapshot of the serializer's current configuration to be stored along with the managed state it is
-	 * registered to (if any - this method is only relevant if this serializer is registered for serialization of
-	 * managed state).
+	 * Snapshots the configuration of this TypeSerializer. This method is only relevant if the serializer is
+	 * used to state stored in checkpoints/savepoints.
 	 *
-	 * <p>The configuration snapshot should contain information about the serializer's parameter settings and its
-	 * serialization format. When a new serializer is registered to serialize the same managed state that this
-	 * serializer was registered to, the returned configuration snapshot can be used to ensure compatibility
-	 * of the new serializer and determine if state migration is required.
+	 * <p>The snapshot of the TypeSerializer is supposed to contain all information that affects the serialization
+	 * format of the serializer. The snapshot serves two purposes: First, to reproduce the serializer when the
+	 * checkpoint/savepoint is restored, and second, to check whether the serialization format is compatible
+	 * with the serializer used in the restored program.
 	 *
-	 * @see TypeSerializerSnapshot
+	 * <p><b>IMPORTANT:</b> TypeSerializerSnapshots changed after Flink 1.6. Serializers implemented against
+	 * Flink versions up to 1.6 should still work, but adjust to new model to enable state evolution and be
+	 * future-proof.
+	 * See the class-level comments, section "Upgrading TypeSerializers to the new TypeSerializerSnapshot model"
+	 * for details.
+	 *
+	 * @see TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)
 	 *
 	 * @return snapshot of the serializer's current configuration (cannot be {@code null}).
 	 */
 	public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
 
+	// --------------------------------------------------------------------------------------------
+	//  Deprecated methods for backwards compatibility
+	// --------------------------------------------------------------------------------------------
+
 	/**
-	 * Ensure compatibility of this serializer with a preceding serializer that was registered for serialization of
-	 * the same managed state (if any - this method is only relevant if this serializer is registered for
-	 * serialization of managed state).
-	 *
-	 * <p>The compatibility check in this method should be performed by inspecting the preceding serializer's configuration
-	 * snapshot. The method may reconfigure the serializer (if required and possible) so that it may be compatible,
-	 * or provide a signaling result that informs Flink that state migration is necessary before continuing to use
-	 * this serializer.
-	 *
-	 * <p>The result can be one of the following:
-	 * <ul>
-	 *     <li>{@link CompatibilityResult#compatible()}: this signals Flink that this serializer is compatible, or
-	 *     has been reconfigured to be compatible, to continue reading previous data, and that the
-	 *     serialization schema remains the same. No migration needs to be performed.</li>
-	 *
-	 *     <li>{@link CompatibilityResult#requiresMigration(TypeDeserializer)}: this signals Flink that
-	 *     migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be
-	 *     compatible, for previous data. Furthermore, in the case that the preceding serializer cannot be found or
-	 *     restored to read the previous data to perform the migration, the provided convert deserializer can be
-	 *     used as a fallback resort.</li>
+	 * This method is deprecated. It used to resolved compatibility of the serializer with serializer
+	 * config snapshots in checkpoints. The responsibility for this has moved to
+	 * {@link TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)}.
 	 *
-	 *     <li>{@link CompatibilityResult#requiresMigration()}: this signals Flink that migration needs to be
-	 *     performed, because this serializer is not compatible, or cannot be reconfigured to be compatible, for
-	 *     previous data. If the preceding serializer cannot be found (either its implementation changed or it was
-	 *     removed from the classpath) then the migration will fail due to incapability to read previous data.</li>
-	 * </ul>
+	 * <p>New serializers should not override this method any more! Serializers implemented against Flink
+	 * versions up to 1.6 should still work, but should adjust to new model to enable state evolution and
+	 * be future-proof. See the class-level comments, section <i>"Upgrading TypeSerializers to the new
+	 * TypeSerializerSnapshot model"</i> for details.
 	 *
-	 * @see CompatibilityResult
-	 *
-	 * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state
-	 *
-	 * @return the determined compatibility result (cannot be {@code null}).
+	 * @deprecated Replaced by {@link TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)}.
 	 */
 	@Deprecated
 	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-		throw new IllegalStateException(
-			"Seems like that you are still using TypeSerializerConfigSnapshot; if so, this method must be implemented. " +
-				"Once you change to directly use TypeSerializerSnapshot, then you can safely remove the implementation " +
-				"of this method.");
-	}
-
-	@Internal
-	public final CompatibilityResult<T> ensureCompatibility(TypeSerializerSnapshot<?> configSnapshot) {
-		if (configSnapshot instanceof TypeSerializerConfigSnapshot) {
-			return ensureCompatibility((TypeSerializerConfigSnapshot<?>) configSnapshot);
-		} else {
-			@SuppressWarnings("unchecked")
-			TypeSerializerSnapshot<T> casted = (TypeSerializerSnapshot<T>) configSnapshot;
-
-			TypeSerializerSchemaCompatibility<T, ? extends TypeSerializer<T>> compat = casted.resolveSchemaCompatibility(this);
-			if (compat.isCompatibleAsIs()) {
-				return CompatibilityResult.compatible();
-			} else if (compat.isCompatibleAfterMigration()) {
-				return CompatibilityResult.requiresMigration();
-			} else if (compat.isIncompatible()) {
-				throw new IllegalStateException("The new serializer is incompatible.");
-			} else {
-				throw new IllegalStateException("Unidentifiable schema compatibility type. This is a bug, please file a JIRA.");
-			}
-		}
+		throw new UnsupportedOperationException(
+				"This method is not supported any more - please evolve your TypeSerializer the following way:\n\n" +
+				"  - If you have a serializer whose 'ensureCompatibility()' method delegates to another\n" +
+				"    serializer's 'ensureCompatibility()', please use" +
+						"'CompatibilityUtil.resolveCompatibilityResult(snapshot, this)' instead.\n\n" +
+				"  - If you updated your serializer (removed overriding the 'ensureCompatibility()' method),\n" +
+				"    please also update the corresponding config snapshot to not extend 'TypeSerializerConfigSnapshot'" +
+						"any more.\n\n");
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
index 236b994b295..9ae275a056a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
@@ -22,92 +22,39 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
- * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted within checkpoints
- * as a single source of meta information about the schema of serialized data in the checkpoint.
- * This serves three purposes:
- *
- * <ul>
- *   <li><strong>Capturing serializer parameters and schema:</strong> a serializer's configuration snapshot
- *   represents information about the parameters, state, and schema of a serializer.
- *   This is explained in more detail below.</li>
- *
- *   <li><strong>Compatibility checks for new serializers:</strong> when new serializers are available,
- *   they need to be checked whether or not they are compatible to read the data written by the previous serializer.
- *   This is performed by providing the new serializer to the corresponding serializer configuration
- *   snapshots in checkpoints.</li>
- *
- *   <li><strong>Factory for a read serializer when schema conversion is required:<strong> in the case that new
- *   serializers are not compatible to read previous data, a schema conversion process executed across all data
- *   is required before the new serializer can be continued to be used. This conversion process requires a compatible
- *   read serializer to restore serialized bytes as objects, and then written back again using the new serializer.
- *   In this scenario, the serializer configuration snapshots in checkpoints doubles as a factory for the read
- *   serializer of the conversion process.</li>
- * </ul>
- *
- * <h2>Serializer Configuration and Schema</h2>
- *
- * <p>Since serializer configuration snapshots needs to be used to ensure serialization compatibility
- * for the same managed state as well as serving as a factory for compatible read serializers, the configuration
- * snapshot should encode sufficient information about:
- *
- * <ul>
- *   <li><strong>Parameter settings of the serializer:</strong> parameters of the serializer include settings
- *   required to setup the serializer, or the state of the serializer if it is stateful. If the serializer
- *   has nested serializers, then the configuration snapshot should also contain the parameters of the nested
- *   serializers.</li>
+ * This class bridges between the old serializer config snapshot interface (this class) and the new
+ * serializer config snapshot interface ({@link TypeSerializerSnapshot}).
  *
- *   <li><strong>Serialization schema of the serializer:</strong> the binary format used by the serializer, or
- *   in other words, the schema of data written by the serializer.</li>
- * </ul>
- *
- * <p>NOTE: Implementations must contain the default empty nullary constructor. This is required to be able to
- * deserialize the configuration snapshot from its binary form.
- *
- * @param <T> The data type that the originating serializer of this configuration serializes.
- *
- * @deprecated This class has been deprecated since Flink 1.7, and will eventually be removed.
- *             Please refer to, and directly implement a {@link TypeSerializerSnapshot} instead.
- *             Class-level Javadocs of {@link TypeSerializerSnapshot} provides more details
- *             on migrating to the new interface.
+ * <p>Serializers that create snapshots and compatibility checks with the old interfaces extends this class
+ * and should migrate to extend {@code TypeSerializerSnapshot} to properly support state evolution/migration
+ * and be future-proof.
  */
 @PublicEvolving
 @Deprecated
 public abstract class TypeSerializerConfigSnapshot<T> extends VersionedIOReadableWritable implements TypeSerializerSnapshot<T> {
 
+	/** Version / Magic number for the format that bridges between the old and new interface. */
+	private static final int ADAPTER_VERSION = 0x7a53c4f0;
+
 	/** The user code class loader; only relevant if this configuration instance was deserialized from binary form. */
 	private ClassLoader userCodeClassLoader;
 
-	/**
-	 * The originating serializer of this configuration snapshot.
-	 */
+	/** The originating serializer of this configuration snapshot. */
 	private TypeSerializer<T> serializer;
 
-	/**
-	 * Creates a serializer using this configuration, that is capable of reading data
-	 * written by the serializer described by this configuration.
-	 *
-	 * @return the restored serializer.
-	 */
-	public TypeSerializer<T> restoreSerializer() {
-		if (serializer != null) {
-			return this.serializer;
-		} else {
-			throw new IllegalStateException("Trying to restore the prior serializer via TypeSerializerConfigSnapshot, " +
-				"but the prior serializer has not been set.");
-		}
-	}
-
 	/**
 	 * Set the originating serializer of this configuration snapshot.
 	 */
 	@Internal
-	public void setPriorSerializer(TypeSerializer<T> serializer) {
+	public final void setPriorSerializer(TypeSerializer<T> serializer) {
 		this.serializer = Preconditions.checkNotNull(serializer);
 	}
 
@@ -135,26 +82,77 @@ public final ClassLoader getUserCodeClassLoader() {
 		return userCodeClassLoader;
 	}
 
-	public abstract boolean equals(Object obj);
-
-	public abstract int hashCode();
-
 	// ----------------------------------------------------------------------------
-	//  Irrelevant methods; these methods should only ever be used when the new interface is directly implemented.
+	//  Implementation of the TypeSerializerSnapshot interface
 	// ----------------------------------------------------------------------------
 
 	@Override
-	public int getCurrentVersion() {
-		throw new UnsupportedOperationException();
+	public final int getCurrentVersion() {
+		return ADAPTER_VERSION;
+	}
+
+	@Override
+	public final void writeSnapshot(DataOutputView out) throws IOException {
+		checkState(serializer != null, "the prior serializer has not been set on this");
+
+		// write the snapshot for a non-updated serializer.
+		// this mimics the previous behavior where the TypeSerializer was
+		// Java-serialized, for backwards compatibility
+		TypeSerializerSerializationUtil.writeSerializer(out, serializer);
+
+		// now delegate to the snapshots own writing code
+		write(out);
+	}
+
+	@Override
+	public final void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		if (readVersion != ADAPTER_VERSION) {
+			throw new IOException("Wrong/unexpected version for the TypeSerializerConfigSnapshot: " + readVersion);
+		}
+
+		serializer = TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true);
+
+		// now delegate to the snapshots own reading code
+		setUserCodeClassLoader(userCodeClassLoader);
+		read(in);
 	}
 
+	/**
+	 * Creates a serializer using this configuration, that is capable of reading data
+	 * written by the serializer described by this configuration.
+	 *
+	 * @return the restored serializer.
+	 */
 	@Override
-	public final void read(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		throw new UnsupportedOperationException();
+	public final TypeSerializer<T> restoreSerializer() {
+		if (serializer == null) {
+			throw new IllegalStateException(
+					"Trying to restore the prior serializer via TypeSerializerConfigSnapshot, " +
+					"but the prior serializer has not been set.");
+		}
+		else if (serializer instanceof UnloadableDummyTypeSerializer) {
+			Throwable originalError = ((UnloadableDummyTypeSerializer<?>) serializer).getOriginalError();
+
+			throw new IllegalStateException(
+					"Could not Java-deserialize TypeSerializer while restoring checkpoint metadata for serializer " +
+					"snapshot '" + getClass().getName() + "'. " +
+					"Please update to the TypeSerializerSnapshot interface that removes Java Serialization to avoid " +
+					"this problem in the future.", originalError);
+		} else {
+			return this.serializer;
+		}
 	}
 
 	@Override
-	public final <NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(NS newSerializer) {
-		throw new UnsupportedOperationException();
+	public final <NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(
+			NS newSerializer) {
+
+		// in prior versions, the compatibility check was in the serializer itself, so we
+		// delegate this call to the serializer.
+		final CompatibilityResult<T> compatibility = newSerializer.ensureCompatibility(this);
+
+		return compatibility.isRequiresMigration() ?
+				TypeSerializerSchemaCompatibility.incompatible() :
+				TypeSerializerSchemaCompatibility.compatibleAsIs();
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
index e7cdc382f60..7a4ee5bd606 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
@@ -118,7 +118,7 @@
 		} catch (UnloadableTypeSerializerException e) {
 			if (useDummyPlaceholder) {
 				LOG.warn("Could not read a requested serializer. Replaced with a UnloadableDummyTypeSerializer.", e.getCause());
-				return new UnloadableDummyTypeSerializer<>(e.getSerializerBytes());
+				return new UnloadableDummyTypeSerializer<>(e.getSerializerBytes(), e.getCause());
 			} else {
 				throw e;
 			}
@@ -232,8 +232,6 @@ public static void writeSerializersAndConfigsWithResilience(
 	 */
 	public static final class TypeSerializerSerializationProxy<T> extends VersionedIOReadableWritable {
 
-		private static final Logger LOG = LoggerFactory.getLogger(TypeSerializerSerializationProxy.class);
-
 		private static final int VERSION = 1;
 
 		private ClassLoader userClassLoader;
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
index 067d43d480d..4715d1119bf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
@@ -25,7 +25,7 @@
 import java.io.IOException;
 
 /**
- * A {@code TypeSerializerSnapshot} is a point-in-time view of a {@link TypeSerializer's} configuration.
+ * A {@code TypeSerializerSnapshot} is a point-in-time view of a {@link TypeSerializer}'s configuration.
  * The configuration snapshot of a serializer is persisted within checkpoints
  * as a single source of meta information about the schema of serialized data in the checkpoint.
  * This serves three purposes:
@@ -40,7 +40,7 @@
  *   This is performed by providing the new serializer to the correspondibng serializer configuration
  *   snapshots in checkpoints.</li>
  *
- *   <li><strong>Factory for a read serializer when schema conversion is required:<strong> in the case that new
+ *   <li><strong>Factory for a read serializer when schema conversion is required:</strong> in the case that new
  *   serializers are not compatible to read previous data, a schema conversion process executed across all data
  *   is required before the new serializer can be continued to be used. This conversion process requires a compatible
  *   read serializer to restore serialized bytes as objects, and then written back again using the new serializer.
@@ -86,9 +86,9 @@
 	 *
 	 * @param out the {@link DataOutputView} to write the snapshot to.
 	 *
-	 * @throws IOException
+	 * @throws IOException Thrown if the snapshot data could not be written.
 	 */
-	void write(DataOutputView out) throws IOException;
+	void writeSnapshot(DataOutputView out) throws IOException;
 
 	/**
 	 * Reads the serializer snapshot from the provided {@link DataInputView}.
@@ -100,9 +100,9 @@
 	 * @param in the {@link DataInputView} to read the snapshot from.
 	 * @param userCodeClassLoader the user code classloader
 	 *
-	 * @throws IOException
+	 * * @throws IOException Thrown if the snapshot data could be read or parsed.
 	 */
-	void read(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException;
+	void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException;
 
 	/**
 	 * Recreates a serializer instance from this snapshot. The returned
@@ -114,8 +114,16 @@
 	TypeSerializer<T> restoreSerializer();
 
 	/**
-	 * Checks a new serializer's compatibility to read data written by the prior
-	 * serializer.
+	 * Checks a new serializer's compatibility to read data written by the prior serializer.
+	 *
+	 * <p>When a checkpoint/savepoint is restored, this method checks whether the serialization
+	 * format of the data in the checkpoint/savepoint is compatible for the format of the serializer used by the
+	 * program that restores the checkpoint/savepoint. The outcome can be that the serialization format is
+	 * compatible, that the program's serializer needs to reconfigure itself (meaning to incorporate some
+	 * information from the TypeSerializerSnapshot to be compatible), that the format is outright incompatible,
+	 * or that a migration needed. In the latter case, the TypeSerializerSnapshot produces a serializer to
+	 * deserialize the data, and the restoring program's serializer re-serializes the data, thus converting
+	 * the format during the restore operation.
 	 *
 	 * @param newSerializer the new serializer to check.
 	 * @param <NS> the type of the new serializer
@@ -124,4 +132,35 @@
 	 */
 	<NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(NS newSerializer);
 
+	// ------------------------------------------------------------------------
+	//  read / write utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Writes the given snapshot to the out stream. One should always use this method to write
+	 * snapshots out, rather than directly calling {@link #writeSnapshot(DataOutputView)}.
+	 *
+	 * <p>The snapshot written with this method can be read via {@link #readVersionedSnapshot(DataInputView, ClassLoader)}.
+	 */
+	static void writeVersionedSnapshot(DataOutputView out, TypeSerializerSnapshot<?> snapshot) throws IOException {
+		out.writeUTF(snapshot.getClass().getName());
+		out.writeInt(snapshot.getCurrentVersion());
+		snapshot.writeSnapshot(out);
+	}
+
+
+	/**
+	 * Reads a snapshot from the stream, performing resolving
+	 *
+	 * <p>This method reads snapshots written by {@link #writeVersionedSnapshot(DataOutputView, TypeSerializerSnapshot)}.
+	 */
+	static <T> TypeSerializerSnapshot<T> readVersionedSnapshot(DataInputView in, ClassLoader cl) throws IOException {
+		final TypeSerializerSnapshot<T> snapshot =
+				TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(in, cl);
+
+		final int version = in.readInt();
+		snapshot.readSnapshot(version, in, cl);
+
+		return snapshot;
+	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java
index 0bcff93b802..8e12e301cbb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -25,8 +26,11 @@
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Utility methods for serialization of {@link TypeSerializerSnapshot}.
  */
@@ -42,8 +46,6 @@
 	 * @param serializerSnapshot the serializer configuration snapshot to write
 	 * @param serializer the prior serializer. This needs to be written of the serializer snapshot
 	 *                   if the serializer snapshot is still the legacy {@link TypeSerializerConfigSnapshot}.
-	 *
-	 * @throws IOException
 	 */
 	public static <T> void writeSerializerSnapshot(
 		DataOutputView out,
@@ -63,8 +65,6 @@
 	 *                                restoring from a snapshot taken with Flink version <= 1.6.
 	 *
 	 * @return the read serializer configuration snapshot
-	 *
-	 * @throws IOException
 	 */
 	public static <T> TypeSerializerSnapshot<T> readSerializerSnapshot(
 			DataInputView in,
@@ -78,6 +78,30 @@
 		return proxy.getSerializerSnapshot();
 	}
 
+
+	public static <T> TypeSerializerSnapshot<T> readAndInstantiateSnapshotClass(DataInputView in, ClassLoader cl) throws IOException {
+		final String className = in.readUTF();
+
+		final Class<? extends TypeSerializerSnapshot> rawClazz;
+		try {
+			rawClazz = Class
+					.forName(className, false, cl)
+					.asSubclass(TypeSerializerSnapshot.class);
+		}
+		catch (ClassNotFoundException e) {
+			throw new IOException(
+					"Could not find requested TypeSerializerSnapshot class '" + className +  "' in classpath.", e);
+		}
+		catch (ClassCastException e) {
+			throw new IOException("The class '" + className + "' is not a subclass of TypeSerializerSnapshot.", e);
+		}
+
+		@SuppressWarnings("unchecked")
+		final Class<? extends TypeSerializerSnapshot<T>> clazz = (Class<? extends TypeSerializerSnapshot<T>>) rawClazz;
+
+		return InstantiationUtil.instantiate(clazz);
+	}
+
 	/**
 	 * Utility serialization proxy for a {@link TypeSerializerSnapshot}.
 	 */
@@ -87,8 +111,11 @@
 
 		private ClassLoader userCodeClassLoader;
 		private TypeSerializerSnapshot<T> serializerSnapshot;
-		private TypeSerializer<T> serializer;
+		@Nullable private TypeSerializer<T> serializer;
 
+		/**
+		 * Constructor for reading serializers.
+		 */
 		TypeSerializerSnapshotSerializationProxy(
 			ClassLoader userCodeClassLoader,
 			@Nullable TypeSerializer<T> existingPriorSerializer) {
@@ -96,6 +123,9 @@
 			this.serializer = existingPriorSerializer;
 		}
 
+		/**
+		 * Constructor for writing out serializers.
+		 */
 		TypeSerializerSnapshotSerializationProxy(
 			TypeSerializerSnapshot<T> serializerConfigSnapshot,
 			TypeSerializer<T> serializer) {
@@ -107,91 +137,39 @@
 		 * Binary format layout of a written serializer snapshot is as follows:
 		 *
 		 * <ul>
-		 *     <li>1. Serializer snapshot classname (UTF).</li>
-		 *     <li>2. The originating serializer of the snapshot, if any, written via Java serialization.
-		 *         Presence of the serializer is indicated by a flag (boolean -> TypeSerializer).</li>
-		 *     <li>3. The version of the serializer snapshot's binary format.</li>
-		 *     <li>4. The actual serializer snapshot.</li>
+		 *     <li>1. Format version of this util.</li>
+		 *     <li>2. Name of the TypeSerializerSnapshot class.</li>
+		 *     <li>3. The version of the TypeSerializerSnapshot's binary format.</li>
+		 *     <li>4. The actual serializer snapshot data.</li>
 		 * </ul>
 		 */
+		@SuppressWarnings("deprecation")
 		@Override
 		public void write(DataOutputView out) throws IOException {
-			super.write(out);
+			setSerializerForWriteIfOldPath(serializerSnapshot, serializer);
 
-			// config snapshot class, so that we can re-instantiate the
-			// correct type of config snapshot instance when deserializing
-			out.writeUTF(serializerSnapshot.getClass().getName());
+			// write the format version of this utils format
+			super.write(out);
 
-			if (serializerSnapshot instanceof TypeSerializerConfigSnapshot) {
-				// backwards compatible path, where the serializer snapshot is still using the
-				// deprecated interface; the originating serializer needs to be written to the byte stream
-				out.writeBoolean(true);
-				@SuppressWarnings("unchecked")
-				TypeSerializerConfigSnapshot<T> legacySerializerSnapshot = (TypeSerializerConfigSnapshot<T>) serializerSnapshot;
-				TypeSerializerSerializationUtil.writeSerializer(out, serializer);
-
-				// TypeSerializerConfigSnapshot includes the version number implicitly when it is written
-				legacySerializerSnapshot.write(out);
-			} else {
-				out.writeBoolean(false);
-
-				out.writeInt(serializerSnapshot.getCurrentVersion());
-				serializerSnapshot.write(out);
-			}
+			TypeSerializerSnapshot.writeVersionedSnapshot(out, serializerSnapshot);
 		}
 
 		@SuppressWarnings("unchecked")
 		@Override
 		public void read(DataInputView in) throws IOException {
+			// read version
 			super.read(in);
-
-			String serializerConfigClassname = in.readUTF();
-			Class<? extends TypeSerializerSnapshot> serializerConfigSnapshotClass;
-			try {
-				serializerConfigSnapshotClass = (Class<? extends TypeSerializerSnapshot>)
-					Class.forName(serializerConfigClassname, false, userCodeClassLoader);
-			} catch (ClassNotFoundException e) {
-				throw new IOException(
-					"Could not find requested TypeSerializerConfigSnapshot class "
-						+ serializerConfigClassname +  " in classpath.", e);
-			}
-
-			serializerSnapshot = InstantiationUtil.instantiate(serializerConfigSnapshotClass);
-
-			if (getReadVersion() >= 2) {
-				// Flink version after 1.7
-
-				boolean containsPriorSerializer = in.readBoolean();
-
-				TypeSerializer<T> priorSerializer = (containsPriorSerializer)
-					? TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true)
-					: null;
-
-				if (serializerSnapshot instanceof TypeSerializerConfigSnapshot) {
-					if (priorSerializer != null) {
-						((TypeSerializerConfigSnapshot<T>) serializerSnapshot).setPriorSerializer(priorSerializer);
-						((TypeSerializerConfigSnapshot<T>) serializerSnapshot).setUserCodeClassLoader(userCodeClassLoader);
-						((TypeSerializerConfigSnapshot<T>) serializerSnapshot).read(in);
-					} else {
-						// this occurs if the user changed a TypeSerializerSnapshot to the
-						// legacy TypeSerializerConfigSnapshot, which isn't supported.
-						throw new IOException("Cannot read a legacy TypeSerializerConfigSnapshot without the prior serializer present. ");
-					}
-				} else {
-					int readVersion = in.readInt();
-					serializerSnapshot.read(readVersion, in, userCodeClassLoader);
-				}
-			} else {
-				// Flink version before 1.7.x, and after 1.3.x
-
-				if (serializerSnapshot instanceof TypeSerializerConfigSnapshot) {
-					((TypeSerializerConfigSnapshot<T>) serializerSnapshot).setPriorSerializer(this.serializer);
-					((TypeSerializerConfigSnapshot<T>) serializerSnapshot).setUserCodeClassLoader(userCodeClassLoader);
-					((TypeSerializerConfigSnapshot<T>) serializerSnapshot).read(in);
-				} else {
-					int readVersion = in.readInt();
-					serializerSnapshot.read(readVersion, in, userCodeClassLoader);
-				}
+			final int version = getReadVersion();
+
+			switch (version) {
+				case 2:
+					serializerSnapshot = deserializeV2(in, userCodeClassLoader);
+					break;
+				case 1:
+					serializerSnapshot = deserializeV1(in, userCodeClassLoader, serializer);
+					break;
+				default:
+					throw new IOException("Unrecognized version for TypeSerializerSnapshot format: " + version);
 			}
 		}
 
@@ -208,5 +186,57 @@ public int getVersion() {
 		TypeSerializerSnapshot<T> getSerializerSnapshot() {
 			return serializerSnapshot;
 		}
+
+		/**
+		 * Deserialization path for Flink versions 1.7+.
+		 */
+		@VisibleForTesting
+		static <T> TypeSerializerSnapshot<T> deserializeV2(DataInputView in, ClassLoader cl) throws IOException {
+			return TypeSerializerSnapshot.readVersionedSnapshot(in, cl);
+		}
+
+		/**
+		 * Deserialization path for Flink versions in [1.4, 1.6].
+		 */
+		@VisibleForTesting
+		@SuppressWarnings("deprecation")
+		static <T> TypeSerializerSnapshot<T> deserializeV1(
+				DataInputView in,
+				ClassLoader cl,
+				@Nullable TypeSerializer<T> serializer) throws IOException {
+
+			TypeSerializerSnapshot<T> snapshot = readAndInstantiateSnapshotClass(in, cl);
+
+			// if the snapshot was created before Flink 1.7, we need to distinguish the following cases:
+			//   - old snapshot type that needs serializer from the outside
+			//   - new snapshot type that understands the old format and can produce a restore serializer from it
+			if (snapshot instanceof TypeSerializerConfigSnapshot) {
+				TypeSerializerConfigSnapshot<T> oldTypeSnapshot = (TypeSerializerConfigSnapshot<T>) snapshot;
+				oldTypeSnapshot.setPriorSerializer(serializer);
+				oldTypeSnapshot.setUserCodeClassLoader(cl);
+				oldTypeSnapshot.read(in);
+			}
+			else {
+				// new type, simple case
+				int readVersion = in.readInt();
+				snapshot.readSnapshot(readVersion, in, cl);
+			}
+
+			return snapshot;
+		}
+
+		@SuppressWarnings("deprecation")
+		private static <T> void setSerializerForWriteIfOldPath(
+				TypeSerializerSnapshot<T> serializerSnapshot,
+				TypeSerializer<T> serializer) {
+
+			// for compatibility with non-upgraded serializers, put the serializer into the
+			// config snapshot if it of the old version
+			if (serializerSnapshot instanceof TypeSerializerConfigSnapshot) {
+				checkState(serializer != null);
+
+				((TypeSerializerConfigSnapshot<T>) serializerSnapshot).setPriorSerializer(serializer);
+			}
+		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
index 448c53b209e..fe55be79e12 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
@@ -22,6 +22,7 @@
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InvalidClassException;
 import java.util.Arrays;
@@ -33,16 +34,30 @@
 public class UnloadableDummyTypeSerializer<T> extends TypeSerializer<T> {
 
 	private static final long serialVersionUID = 2526330533671642711L;
+
 	private final byte[] actualBytes;
 
+	@Nullable
+	private final Throwable originalError;
+
 	public UnloadableDummyTypeSerializer(byte[] actualBytes) {
+		this(actualBytes, null);
+	}
+
+	public UnloadableDummyTypeSerializer(byte[] actualBytes, @Nullable Throwable originalError) {
 		this.actualBytes = Preconditions.checkNotNull(actualBytes);
+		this.originalError = originalError;
 	}
 
 	public byte[] getActualBytes() {
 		return actualBytes;
 	}
 
+	@Nullable
+	public Throwable getOriginalError() {
+		return originalError;
+	}
+
 	@Override
 	public boolean isImmutableType() {
 		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index a79ac1de6fc..9c6bcb50bd5 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -20,7 +20,6 @@
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -121,12 +120,8 @@ public void testSnapshotConfigurationAndReconfigure() throws Exception {
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), getSerializer());
 		}
 
-		CompatibilityResult strategy = getSerializer().ensureCompatibility(restoredConfig);
-		assertFalse(strategy.isRequiresMigration());
-
-		// also verify that the serializer's reconfigure implementation detects incompatibility
-		strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot<>());
-		assertTrue(strategy.isRequiresMigration());
+		TypeSerializerSchemaCompatibility<T, ? extends TypeSerializer<T>> strategy = restoredConfig.resolveSchemaCompatibility(getSerializer());
+		assertTrue(strategy.isCompatibleAsIs());
 	}
 
 	@Test
@@ -543,23 +538,6 @@ public void skipBytesToRead(int numBytes) throws IOException {
 		}
 	}
 
-	public static final class TestIncompatibleSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot<T> {
-		@Override
-		public int getVersion() {
-			return 0;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			return obj instanceof TestIncompatibleSerializerConfigSnapshot;
-		}
-
-		@Override
-		public int hashCode() {
-			return getClass().hashCode();
-		}
-	}
-
 	private static <T> void checkToString(T value) {
 		if (value != null) {
 			value.toString();
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
index 8f82ea88ed9..0fae2699807 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -106,8 +107,8 @@ public void testConfigurationSnapshotSerialization() throws Exception {
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), serializer);
 		}
 
-		CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(restoredConfig);
-		assertFalse(compatResult.isRequiresMigration());
+		TypeSerializerSchemaCompatibility<PublicEnum, ?> compatResult = restoredConfig.resolveSchemaCompatibility(serializer);
+		assertTrue(compatResult.isCompatibleAsIs());
 
 		assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
 		assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
index b4f983382f5..e906f62f65b 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -56,7 +56,7 @@
 	 */
 	@Test
 	public void checkIndenticalEnums() throws Exception {
-		Assert.assertFalse(checkCompatibility(ENUM_A, ENUM_A).isRequiresMigration());
+		Assert.assertTrue(checkCompatibility(ENUM_A, ENUM_A).isCompatibleAsIs());
 	}
 
 	/**
@@ -64,7 +64,7 @@ public void checkIndenticalEnums() throws Exception {
 	 */
 	@Test
 	public void checkAppendedField() throws Exception {
-		Assert.assertFalse(checkCompatibility(ENUM_A, ENUM_B).isRequiresMigration());
+		Assert.assertTrue(checkCompatibility(ENUM_A, ENUM_B).isCompatibleAsIs());
 	}
 
 	/**
@@ -72,7 +72,7 @@ public void checkAppendedField() throws Exception {
 	 */
 	@Test
 	public void checkRemovedField() throws Exception {
-		Assert.assertTrue(checkCompatibility(ENUM_A, ENUM_C).isRequiresMigration());
+		Assert.assertTrue(checkCompatibility(ENUM_A, ENUM_C).isIncompatible());
 	}
 
 	/**
@@ -80,11 +80,11 @@ public void checkRemovedField() throws Exception {
 	 */
 	@Test
 	public void checkDifferentFieldOrder() throws Exception {
-		Assert.assertFalse(checkCompatibility(ENUM_A, ENUM_D).isRequiresMigration());
+		Assert.assertTrue(checkCompatibility(ENUM_A, ENUM_D).isCompatibleAsIs());
 	}
 
 	@SuppressWarnings("unchecked")
-	private static CompatibilityResult checkCompatibility(String enumSourceA, String enumSourceB)
+	private static TypeSerializerSchemaCompatibility checkCompatibility(String enumSourceA, String enumSourceB)
 		throws IOException, ClassNotFoundException {
 
 		ClassLoader classLoader = compileAndLoadEnum(
@@ -116,7 +116,7 @@ private static CompatibilityResult checkCompatibility(String enumSourceA, String
 		}
 
 		EnumSerializer enumSerializer2 = new EnumSerializer(classLoader2.loadClass(ENUM_NAME));
-		return enumSerializer2.ensureCompatibility(restoredSnapshot);
+		return restoredSnapshot.resolveSchemaCompatibility(enumSerializer2);
 	}
 
 	private static ClassLoader compileAndLoadEnum(File root, String filename, String source) throws IOException {
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 9620846a4a6..4f54c3ee4d9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -34,13 +34,13 @@
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
@@ -63,7 +63,6 @@
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -311,8 +310,10 @@ public void testReconfigureWithDifferentPojoType() throws Exception {
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), pojoSerializer2);
 		}
 
-		CompatibilityResult<SubTestUserClassA> compatResult = pojoSerializer2.ensureCompatibility(pojoSerializerConfigSnapshot);
-		assertTrue(compatResult.isRequiresMigration());
+		@SuppressWarnings("unchecked")
+		TypeSerializerSchemaCompatibility<SubTestUserClassA, ?> compatResult =
+			pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer2);
+		assertTrue(compatResult.isIncompatible());
 	}
 
 	/**
@@ -352,8 +353,10 @@ public void testReconfigureDifferentSubclassRegistrationOrder() throws Exception
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), pojoSerializer);
 		}
 
-		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
-		assertTrue(!compatResult.isRequiresMigration());
+		@SuppressWarnings("unchecked")
+		TypeSerializerSchemaCompatibility<TestUserClass, ?> compatResult =
+			pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
+		assertTrue(compatResult.isCompatibleAsIs());
 
 		// reconfigure - check reconfiguration result and that registration ids remains the same
 		//assertEquals(ReconfigureResult.COMPATIBLE, pojoSerializer.reconfigure(pojoSerializerConfigSnapshot));
@@ -397,8 +400,10 @@ public void testReconfigureRepopulateNonregisteredSubclassSerializerCache() thro
 		}
 
 		// reconfigure - check reconfiguration result and that subclass serializer cache is repopulated
-		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
-		assertFalse(compatResult.isRequiresMigration());
+		@SuppressWarnings("unchecked")
+		TypeSerializerSchemaCompatibility<TestUserClass, ?> compatResult =
+			pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
+		assertTrue(compatResult.isCompatibleAsIs());
 		assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
 		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
 		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
@@ -460,8 +465,10 @@ public void testReconfigureWithPreviouslyNonregisteredSubclasses() throws Except
 		// reconfigure - check reconfiguration result and that
 		// 1) subclass serializer cache is repopulated
 		// 2) registrations also contain the now registered subclasses
-		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
-		assertFalse(compatResult.isRequiresMigration());
+		@SuppressWarnings("unchecked")
+		TypeSerializerSchemaCompatibility<TestUserClass, ?> compatResult =
+			pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
+		assertTrue(compatResult.isCompatibleAsIs());
 		assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
 		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
 		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
@@ -537,8 +544,9 @@ public void testReconfigureWithDifferentFieldOrder() throws Exception {
 				new HashMap<>()); // empty; irrelevant for this test
 
 		// reconfigure - check reconfiguration result and that fields are reordered to the previous order
-		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(mockPreviousConfigSnapshot);
-		assertFalse(compatResult.isRequiresMigration());
+		TypeSerializerSchemaCompatibility<TestUserClass, ?> compatResult =
+			mockPreviousConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
+		assertTrue(compatResult.isCompatibleAsIs());
 		int i = 0;
 		for (Field field : mockOriginalFieldOrder) {
 			assertEquals(field, pojoSerializer.getFields()[i]);
@@ -580,7 +588,7 @@ public void testSerializerSerializationFailureResilience() throws Exception{
 					pojoSerializer);
 		}
 
-		Assert.assertFalse(pojoSerializer.ensureCompatibility(deserializedConfig).isRequiresMigration());
+		Assert.assertTrue(deserializedConfig.resolveSchemaCompatibility(pojoSerializer).isCompatibleAsIs());
 		verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(config, deserializedConfig);
 	}
 
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 3c9d990fb31..869941e9174 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -44,7 +44,6 @@
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -67,8 +66,11 @@ public void testMigrationStrategyForRemovedAvroDependency() throws Exception {
 			kryoSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), kryoSerializerForA);
 		}
-		CompatibilityResult<TestClass> compatResult = kryoSerializerForA.ensureCompatibility(kryoSerializerConfigSnapshot);
-		assertFalse(compatResult.isRequiresMigration());
+
+		@SuppressWarnings("unchecked")
+		TypeSerializerSchemaCompatibility<TestClass, ?> compatResult =
+			kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializerForA);
+		assertTrue(compatResult.isCompatibleAsIs());
 	}
 
 	@Test
@@ -111,8 +113,10 @@ public void testMigrationStrategyWithDifferentKryoType() throws Exception {
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), kryoSerializerForB);
 		}
 
-		CompatibilityResult<TestClassB> compatResult = kryoSerializerForB.ensureCompatibility(kryoSerializerConfigSnapshot);
-		assertTrue(compatResult.isRequiresMigration());
+		@SuppressWarnings("unchecked")
+		TypeSerializerSchemaCompatibility<TestClassB, ?> compatResult =
+			kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializerForB);
+		assertTrue(compatResult.isIncompatible());
 	}
 
 	@Test
@@ -272,8 +276,10 @@ public void testMigrationStrategyForDifferentRegistrationOrder() throws Exceptio
 		}
 
 		// reconfigure - check reconfiguration result and that registration id remains the same
-		CompatibilityResult<TestClass> compatResult = kryoSerializer.ensureCompatibility(kryoSerializerConfigSnapshot);
-		assertFalse(compatResult.isRequiresMigration());
+		@SuppressWarnings("unchecked")
+		TypeSerializerSchemaCompatibility<TestClass, ?> compatResult =
+			kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializer);
+		assertTrue(compatResult.isCompatibleAsIs());
 		assertEquals(testClassId, kryoSerializer.getKryo().getRegistration(TestClass.class).getId());
 		assertEquals(testClassAId, kryoSerializer.getKryo().getRegistration(TestClassA.class).getId());
 		assertEquals(testClassBId, kryoSerializer.getKryo().getRegistration(TestClassB.class).getId());
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
new file mode 100644
index 00000000000..1dd56a775aa
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+
+/**
+ * IT case for the {@link AvroExternalJarProgram}.
+ */
+public class LegacyAvroExternalJarProgramITCase extends TestLogger {
+
+	private static final String JAR_FILE = "maven-test-jar.jar";
+
+	private static final String TEST_DATA_FILE = "/testdata.avro";
+
+	@Test
+	public void testExternalProgram() {
+
+		LocalFlinkMiniCluster testMiniCluster = null;
+
+		try {
+			int parallelism = 4;
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+			testMiniCluster = new LocalFlinkMiniCluster(config, false);
+			testMiniCluster.start();
+
+			String jarFile = JAR_FILE;
+			String testData = getClass().getResource(TEST_DATA_FILE).toString();
+
+			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+
+			TestEnvironment.setAsContext(
+				testMiniCluster,
+				parallelism,
+				Collections.singleton(new Path(jarFile)),
+				Collections.<URL>emptyList());
+
+			config.setString(JobManagerOptions.ADDRESS, "localhost");
+			config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
+
+			program.invokeInteractiveModeForExecution();
+		}
+		catch (Throwable t) {
+			System.err.println(t.getMessage());
+			t.printStackTrace();
+			Assert.fail("Error during the packaged program execution: " + t.getMessage());
+		}
+		finally {
+			TestEnvironment.unsetAsContext();
+
+			if (testMiniCluster != null) {
+				try {
+					testMiniCluster.stop();
+				} catch (Throwable t) {
+					// ignore
+				}
+			}
+		}
+	}
+}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
index 7b8763bfa2f..463f8f6f26d 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
@@ -36,7 +36,6 @@
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -105,10 +104,10 @@ public void testCompatibilityWithPojoSerializer() throws Exception {
 		validateDeserialization(serializer);
 
 		// sanity check for the test: check that a PoJoSerializer and the original serializer work together
-		assertFalse(serializer.ensureCompatibility(configSnapshot).isRequiresMigration());
+		assertTrue(configSnapshot.resolveSchemaCompatibility(serializer).isCompatibleAsIs());
 
 		final TypeSerializer<SimpleUser> newSerializer = new AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
-		assertFalse(newSerializer.ensureCompatibility(configSnapshot).isRequiresMigration());
+		assertTrue(configSnapshot.resolveSchemaCompatibility(newSerializer).isCompatibleAsIs());
 
 		// deserialize the data and make sure this still works
 		validateDeserialization(newSerializer);
@@ -116,7 +115,7 @@ public void testCompatibilityWithPojoSerializer() throws Exception {
 		TypeSerializerSnapshot<SimpleUser> nextSnapshot = newSerializer.snapshotConfiguration();
 		final TypeSerializer<SimpleUser> nextSerializer = new AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
 
-		assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration());
+		assertTrue(nextSnapshot.resolveSchemaCompatibility(nextSerializer).isCompatibleAsIs());
 
 		// deserialize the data and make sure this still works
 		validateDeserialization(nextSerializer);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index eae5a3bccdd..ae4fbaa133b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -24,9 +24,9 @@
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -228,42 +228,32 @@ public void dispose() {
 
 			final StateMetaInfoSnapshot metaInfoSnapshot = restoredBroadcastStateMetaInfos.get(name);
 
-			@SuppressWarnings("unchecked")
-			RegisteredBroadcastStateBackendMetaInfo<K, V> restoredMetaInfo = new RegisteredBroadcastStateBackendMetaInfo<K, V>(metaInfoSnapshot);
+			// check whether new serializers are incompatible
+			TypeSerializerSnapshot<K> keySerializerSnapshot = Preconditions.checkNotNull(
+				(TypeSerializerSnapshot<K>) metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER));
 
-			// check compatibility to determine if state migration is required
-			CompatibilityResult<K> keyCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-					restoredMetaInfo.getKeySerializer(),
-					UnloadableDummyTypeSerializer.class,
-					//TODO this keys should not be exposed and should be adapted after FLINK-9377 was merged
-					metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
-					broadcastStateKeySerializer);
-
-			CompatibilityResult<V> valueCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-					restoredMetaInfo.getValueSerializer(),
-					UnloadableDummyTypeSerializer.class,
-					//TODO this keys should not be exposed and should be adapted after FLINK-9377 was merged
-					metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-					broadcastStateValueSerializer);
-
-			if (!keyCompatibility.isRequiresMigration() && !valueCompatibility.isRequiresMigration()) {
-				// new serializer is compatible; use it to replace the old serializer
-				broadcastState.setStateMetaInfo(
-						new RegisteredBroadcastStateBackendMetaInfo<>(
-								name,
-								OperatorStateHandle.Mode.BROADCAST,
-								broadcastStateKeySerializer,
-								broadcastStateValueSerializer));
-			} else {
-				// TODO state migration currently isn't possible.
-
-				// NOTE: for heap backends, it is actually fine to proceed here without failing the restore,
-				// since the state has already been deserialized to objects and we can just continue with
-				// the new serializer; we're deliberately failing here for now to have equal functionality with
-				// the RocksDB backend to avoid confusion for users.
-
-				throw StateMigrationException.notSupported();
+			TypeSerializerSchemaCompatibility<K, ?> keyCompatibility =
+				keySerializerSnapshot.resolveSchemaCompatibility(broadcastStateKeySerializer);
+			if (keyCompatibility.isIncompatible()) {
+				throw new StateMigrationException("The new key serializer for broadcast state must not be incompatible.");
+			}
+
+			TypeSerializerSnapshot<V> valueSerializerSnapshot = Preconditions.checkNotNull(
+				(TypeSerializerSnapshot<V>) metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+
+			TypeSerializerSchemaCompatibility<V, ?> valueCompatibility =
+				valueSerializerSnapshot.resolveSchemaCompatibility(broadcastStateValueSerializer);
+			if (valueCompatibility.isIncompatible()) {
+				throw new StateMigrationException("The new value serializer for broadcast state must not be incompatible.");
 			}
+
+			// new serializer is compatible; use it to replace the old serializer
+			broadcastState.setStateMetaInfo(
+					new RegisteredBroadcastStateBackendMetaInfo<>(
+							name,
+							OperatorStateHandle.Mode.BROADCAST,
+							broadcastStateKeySerializer,
+							broadcastStateValueSerializer));
 		}
 
 		accessedBroadcastStatesByName.put(name, broadcastState);
@@ -606,27 +596,19 @@ public void addAll(List<S> values) {
 
 			// check compatibility to determine if state migration is required
 			TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
-			CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-					metaInfo.getPartitionStateSerializer(),
-					UnloadableDummyTypeSerializer.class,
-					//TODO this keys should not be exposed and should be adapted after FLINK-9377 was merged
-					restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-					newPartitionStateSerializer);
-
-			if (!stateCompatibility.isRequiresMigration()) {
-				// new serializer is compatible; use it to replace the old serializer
-				partitionableListState.setStateMetaInfo(
-					new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));
-			} else {
-				// TODO state migration currently isn't possible.
-
-				// NOTE: for heap backends, it is actually fine to proceed here without failing the restore,
-				// since the state has already been deserialized to objects and we can just continue with
-				// the new serializer; we're deliberately failing here for now to have equal functionality with
-				// the RocksDB backend to avoid confusion for users.
-
-				throw StateMigrationException.notSupported();
+
+			@SuppressWarnings("unchecked")
+			TypeSerializerSnapshot<S> stateSerializerSnapshot = Preconditions.checkNotNull(
+				(TypeSerializerSnapshot<S>) restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+
+			TypeSerializerSchemaCompatibility<S, ?> stateCompatibility =
+				stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer);
+			if (stateCompatibility.isIncompatible()) {
+				throw new StateMigrationException("The new state serializer for operator state must not be incompatible.");
 			}
+
+			partitionableListState.setStateMetaInfo(
+				new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));
 		}
 
 		accessedStatesByName.put(name, partitionableListState);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index ab6e8b1a046..585490dc75f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -23,7 +23,6 @@
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -63,9 +62,6 @@
 	/** This specifies if we use a compressed format write the key-groups */
 	private boolean usingKeyGroupCompression;
 
-	/** This specifies whether or not to use dummy {@link UnloadableDummyTypeSerializer} when serializers cannot be read. */
-	private boolean isSerializerPresenceRequired;
-
 	// TODO the keySerializer field should be removed, once all serializers have the restoreSerializer() method implemented
 	private TypeSerializer<K> keySerializer;
 	private TypeSerializerSnapshot<K> keySerializerConfigSnapshot;
@@ -74,9 +70,8 @@
 
 	private ClassLoader userCodeClassLoader;
 
-	public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader, boolean isSerializerPresenceRequired) {
+	public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
-		this.isSerializerPresenceRequired = isSerializerPresenceRequired;
 	}
 
 	public KeyedBackendSerializationProxy(
@@ -98,10 +93,6 @@ public KeyedBackendSerializationProxy(
 		return stateMetaInfoSnapshots;
 	}
 
-	public TypeSerializer<K> restoreKeySerializer() {
-		return keySerializerConfigSnapshot.restoreSerializer();
-	}
-
 	public TypeSerializerSnapshot<K> getKeySerializerConfigSnapshot() {
 		return keySerializerConfigSnapshot;
 	}
@@ -163,10 +154,6 @@ public void read(DataInputView in) throws IOException {
 		}
 		this.keySerializer = null;
 
-		if (isSerializerPresenceRequired) {
-			checkSerializerPresence(this.keySerializerConfigSnapshot.restoreSerializer());
-		}
-
 		Integer metaInfoSnapshotVersion = META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.get(readVersion);
 		if (metaInfoSnapshotVersion == null) {
 			// this should not happen; guard for the future
@@ -181,22 +168,7 @@ public void read(DataInputView in) throws IOException {
 		for (int i = 0; i < numKvStates; i++) {
 			StateMetaInfoSnapshot snapshot = stateMetaInfoReader.readStateMetaInfoSnapshot(in, userCodeClassLoader);
 
-			if (isSerializerPresenceRequired) {
-				checkSerializerPresence(
-					snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
-				checkSerializerPresence(
-					snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
-			}
 			stateMetaInfoSnapshots.add(snapshot);
 		}
 	}
-
-	private void checkSerializerPresence(TypeSerializer<?> serializer) throws IOException {
-		if (serializer instanceof UnloadableDummyTypeSerializer) {
-			throw new IOException("Unable to restore keyed state, because a previous serializer" +
-				" of the keyed state is not present The serializer could have been removed from the classpath, " +
-				" or its implementation have changed and could not be loaded. This is a temporary restriction that will" +
-				" be fixed in future versions.");
-		}
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
index 7f95ed70326..a92527a0371 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -19,11 +19,9 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
@@ -191,23 +189,25 @@ public int hashCode() {
 		}
 
 		// check compatibility results to determine if state migration is required
-		CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-			restoredStateMetaInfoSnapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
-			null,
-			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-				StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
-			newNamespaceSerializer);
+		@SuppressWarnings("unchecked")
+		TypeSerializerSnapshot<N> namespaceSerializerSnapshot = Preconditions.checkNotNull(
+			(TypeSerializerSnapshot<N>) restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+				StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
+
+		TypeSerializerSchemaCompatibility<N, ?> namespaceCompatibility =
+			namespaceSerializerSnapshot.resolveSchemaCompatibility(newNamespaceSerializer);
 
 		TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
-		CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-			restoredStateMetaInfoSnapshot.restoreTypeSerializer(
-				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-			UnloadableDummyTypeSerializer.class,
-			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-			newStateSerializer);
-
-		if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
+
+		@SuppressWarnings("unchecked")
+		TypeSerializerSnapshot<S> stateSerializerSnapshot = Preconditions.checkNotNull(
+			(TypeSerializerSnapshot<S>) restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+
+		TypeSerializerSchemaCompatibility<S, ?> stateCompatibility =
+			stateSerializerSnapshot.resolveSchemaCompatibility(newStateSerializer);
+
+		if (!namespaceCompatibility.isCompatibleAsIs() || !stateCompatibility.isCompatibleAsIs()) {
 			// TODO state migration currently isn't possible.
 			throw StateMigrationException.notSupported();
 		} else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
index b7dff59aef0..4132d144a4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
@@ -42,21 +42,4 @@ public String getName() {
 
 	@Nonnull
 	public abstract StateMetaInfoSnapshot snapshot();
-
-	public static RegisteredStateMetaInfoBase fromMetaInfoSnapshot(@Nonnull StateMetaInfoSnapshot snapshot) {
-
-		final StateMetaInfoSnapshot.BackendStateType backendStateType = snapshot.getBackendStateType();
-		switch (backendStateType) {
-			case KEY_VALUE:
-				return new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
-			case OPERATOR:
-				return new RegisteredOperatorStateBackendMetaInfo<>(snapshot);
-			case BROADCAST:
-				return new RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
-			case PRIORITY_QUEUE:
-				return new RegisteredPriorityQueueStateBackendMetaInfo<>(snapshot);
-			default:
-				throw new IllegalArgumentException("Unknown backend state type: " + backendStateType);
-		}
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 837e51fafc0..6ade53caf10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -28,10 +28,9 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -206,14 +205,15 @@ public HeapKeyedStateBackend(
 			StateMetaInfoSnapshot.CommonSerializerKeys serializerKey =
 				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
 
-			CompatibilityResult<T> compatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
-				restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey),
-				null,
-				restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey),
-				byteOrderedElementSerializer);
+			@SuppressWarnings("unchecked")
+			TypeSerializerSnapshot<T> serializerSnapshot = Preconditions.checkNotNull(
+				(TypeSerializerSnapshot<T>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey));
+
+			TypeSerializerSchemaCompatibility<T, ?> compatibilityResult =
+				serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer);
 
-			if (compatibilityResult.isRequiresMigration()) {
-				throw new FlinkRuntimeException(StateMigrationException.notSupported());
+			if (compatibilityResult.isIncompatible()) {
+				throw new FlinkRuntimeException(new StateMigrationException("For heap backends, the new priority queue serializer must not be incompatible."));
 			} else {
 				registeredPQStates.put(
 					stateName,
@@ -405,26 +405,17 @@ private void restorePartitionedState(Collection<KeyedStateHandle> state) throws
 			try {
 				DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
 
-				// isSerializerPresenceRequired flag is set to true, since for the heap state backend,
-				// deserialization of state happens eagerly at restore time
 				KeyedBackendSerializationProxy<K> serializationProxy =
-						new KeyedBackendSerializationProxy<>(userCodeClassLoader, true);
+						new KeyedBackendSerializationProxy<>(userCodeClassLoader);
 
 				serializationProxy.read(inView);
 
 				if (!keySerializerRestored) {
 					// check for key serializer compatibility; this also reconfigures the
 					// key serializer to be compatible, if it is required and is possible
-					if (CompatibilityUtil.resolveCompatibilityResult(
-							serializationProxy.restoreKeySerializer(),
-							UnloadableDummyTypeSerializer.class,
-							serializationProxy.getKeySerializerConfigSnapshot(),
-							keySerializer)
-						.isRequiresMigration()) {
-
-						// TODO replace with state migration; note that key hash codes need to remain the same after migration
-						throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
-							"Aborting now since state migration is currently not available");
+					if (!serializationProxy.getKeySerializerConfigSnapshot()
+							.resolveSchemaCompatibility(keySerializer).isCompatibleAsIs()) {
+						throw new StateMigrationException("The new key serializer must be compatible.");
 					}
 
 					keySerializerRestored = true;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 215d7d36c96..d5310412385 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -32,13 +32,13 @@
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FutureUtil;
 
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.concurrent.RunnableFuture;
@@ -141,8 +141,8 @@ public void testOperatorStateRestoreFailsIfSerializerDeserializationFails() thro
 			operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
 			fail("The operator state restore should have failed if the previous state serializer could not be loaded.");
-		} catch (IOException expected) {
-			Assert.assertTrue(expected.getMessage().contains("Unable to restore operator state"));
+		} catch (Exception expected) {
+			Assert.assertTrue(ExceptionUtils.findThrowable(expected, ClassNotFoundException.class).isPresent());
 		} finally {
 			stateHandle.discardState();
 		}
@@ -194,8 +194,8 @@ public void testKeyedStateRestoreFailsIfSerializerDeserializationFails() throws
 						Collections.singleton(StringSerializer.class.getName()))));
 
 			fail("The keyed state restore should have failed if the previous state serializer could not be loaded.");
-		} catch (IOException expected) {
-			Assert.assertTrue(expected.getMessage().contains("Unable to restore keyed state"));
+		} catch (Exception expected) {
+			Assert.assertTrue(ExceptionUtils.findThrowable(expected, ClassNotFoundException.class).isPresent());
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index ab09557691a..4976b302e83 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -41,6 +41,7 @@
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
 import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.Preconditions;
 
@@ -889,8 +890,8 @@ public void testRestoreFailsIfSerializerDeserializationFails() throws Exception
 			operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
 			fail("The operator state restore should have failed if the previous state serializer could not be loaded.");
-		} catch (IOException expected) {
-			Assert.assertTrue(expected.getMessage().contains("Unable to restore operator state"));
+		} catch (Exception expected) {
+			Assert.assertTrue(ExceptionUtils.findThrowable(expected, ClassNotFoundException.class).isPresent());
 		} finally {
 			stateHandle.discardState();
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 97665518d38..7858b5c74fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -20,7 +20,6 @@
 
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -34,6 +33,7 @@
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
 import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 
+import org.apache.flink.util.ExceptionUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -72,14 +72,13 @@ public void testKeyedBackendSerializationProxyRoundtrip() throws Exception {
 		}
 
 		serializationProxy =
-				new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader(), true);
+				new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
 
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
 			serializationProxy.read(new DataInputViewStreamWrapper(in));
 		}
 
 		Assert.assertTrue(serializationProxy.isUsingKeyGroupCompression());
-		Assert.assertEquals(keySerializer, serializationProxy.restoreKeySerializer());
 		Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
 		assertEqualStateMetaInfoSnapshotsLists(stateMetaInfoList, serializationProxy.getStateMetaInfoSnapshots());
 	}
@@ -120,21 +119,24 @@ public void testKeyedBackendSerializationProxyRoundtripWithSerializerSerializati
 			new KeyedBackendSerializationProxy<>(
 				new ArtificialCNFExceptionThrowingClassLoader(
 					Thread.currentThread().getContextClassLoader(),
-					cnfThrowingSerializerClasses),
-				false);
+					cnfThrowingSerializerClasses));
 
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
 			serializationProxy.read(new DataInputViewStreamWrapper(in));
 		}
 
 		Assert.assertEquals(true, serializationProxy.isUsingKeyGroupCompression());
-		Assert.assertTrue(serializationProxy.restoreKeySerializer() instanceof UnloadableDummyTypeSerializer);
 		Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
 
 		for (StateMetaInfoSnapshot snapshot : serializationProxy.getStateMetaInfoSnapshots()) {
-			final RegisteredKeyValueStateBackendMetaInfo<?, ?> restoredMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
-			Assert.assertTrue(restoredMetaInfo.getNamespaceSerializer() instanceof UnloadableDummyTypeSerializer);
-			Assert.assertTrue(restoredMetaInfo.getStateSerializer() instanceof UnloadableDummyTypeSerializer);
+			try {
+				// creating a registered meta info from the snapshot would fail, because the serializer snapshots
+				// cannot create a proper restore serializer
+				new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+			} catch (Exception e) {
+				Assert.assertTrue(ExceptionUtils.findThrowable(e, ClassNotFoundException.class).isPresent());
+			}
+
 			Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
 			Assert.assertEquals(stateSerializer.snapshotConfiguration(), snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
 		}
@@ -168,7 +170,7 @@ public void testKeyedStateMetaInfoSerialization() throws Exception {
 	}
 
 	@Test
-	public void testKeyedStateMetaInfoReadSerializerFailureResilience() throws Exception {
+	public void testKeyedStateMetaInfoReadWithSerializerSerializationFailure() throws Exception {
 		String name = "test";
 		TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
 		TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
@@ -198,11 +200,15 @@ public void testKeyedStateMetaInfoReadSerializerFailureResilience() throws Excep
 				new DataInputViewStreamWrapper(in), classLoader);
 		}
 
-		RegisteredKeyValueStateBackendMetaInfo<?, ?> restoredMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+		try {
+			// creating a registered meta info from the snapshot would fail, because the serializer snapshots
+			// cannot create a proper restore serializer
+			new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+		} catch (Exception e) {
+			Assert.assertTrue(ExceptionUtils.findThrowable(e, ClassNotFoundException.class).isPresent());
+		}
 
-		Assert.assertEquals(name, restoredMetaInfo.getName());
-		Assert.assertTrue(restoredMetaInfo.getNamespaceSerializer() instanceof UnloadableDummyTypeSerializer);
-		Assert.assertTrue(restoredMetaInfo.getStateSerializer() instanceof UnloadableDummyTypeSerializer);
+		Assert.assertEquals(name, snapshot.getName());
 		Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
 		Assert.assertEquals(stateSerializer.snapshotConfiguration(), snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
 	}
@@ -320,7 +326,7 @@ public void testBroadcastStateMetaInfoSerialization() throws Exception {
 	}
 
 	@Test
-	public void testOperatorStateMetaInfoReadSerializerFailureResilience() throws Exception {
+	public void testOperatorStateMetaInfoReadWithSerializerSerializationFailure() throws Exception {
 		String name = "test";
 		TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
@@ -348,18 +354,22 @@ public void testOperatorStateMetaInfoReadSerializerFailureResilience() throws Ex
 			snapshot = reader.readStateMetaInfoSnapshot(new DataInputViewStreamWrapper(in), classLoader);
 		}
 
-		RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
+		try {
+			// creating a registered meta info from the snapshot would fail, because the serializer snapshots
+			// cannot create a proper restore serializer
 			new RegisteredOperatorStateBackendMetaInfo<>(snapshot);
+		} catch (Exception e) {
+			Assert.assertTrue(ExceptionUtils.findThrowable(e, ClassNotFoundException.class).isPresent());
+		}
 
-		Assert.assertEquals(name, restoredMetaInfo.getName());
-		Assert.assertTrue(restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer);
+		Assert.assertEquals(name, snapshot.getName());
 		Assert.assertEquals(
 			stateSerializer.snapshotConfiguration(),
 			snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
 	}
 
 	@Test
-	public void testBroadcastStateMetaInfoReadSerializerFailureResilience() throws Exception {
+	public void testBroadcastStateMetaInfoReadWithSerializerSerializationFailure() throws Exception {
 		String broadcastName = "broadcastTest";
 		TypeSerializer<?> keySerializer = DoubleSerializer.INSTANCE;
 		TypeSerializer<?> valueSerializer = StringSerializer.INSTANCE;
@@ -393,14 +403,17 @@ public void testBroadcastStateMetaInfoReadSerializerFailureResilience() throws E
 			snapshot = reader.readStateMetaInfoSnapshot(new DataInputViewStreamWrapper(in), classLoader);
 		}
 
-		RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
+		try {
+			// creating a registered meta info from the snapshot would fail, because the serializer snapshots
+			// cannot create a proper restore serializer
 			new RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
+		} catch (Exception e) {
+			Assert.assertTrue(ExceptionUtils.findThrowable(e, ClassNotFoundException.class).isPresent());
+		}
 
-		Assert.assertEquals(broadcastName, restoredMetaInfo.getName());
-		Assert.assertEquals(OperatorStateHandle.Mode.BROADCAST, restoredMetaInfo.getAssignmentMode());
-		Assert.assertTrue(restoredMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer);
+		Assert.assertEquals(broadcastName, snapshot.getName());
+		Assert.assertEquals(OperatorStateHandle.Mode.BROADCAST.toString(), snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE));
 		Assert.assertEquals(keySerializer.snapshotConfiguration(), snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER));
-		Assert.assertTrue(restoredMetaInfo.getValueSerializer() instanceof UnloadableDummyTypeSerializer);
 		Assert.assertEquals(valueSerializer.snapshotConfiguration(), snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
 	}
 
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
index 3b8331abdf6..5990857c3cc 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
@@ -21,13 +21,13 @@ package org.apache.flink.api.scala.runtime
 import java.io.InputStream
 
 import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
+import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSerializationUtil, TypeSerializerSnapshot}
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot
 import org.apache.flink.api.scala.createTypeInformation
 import org.apache.flink.api.scala.runtime.TupleSerializerCompatibilityTestGenerator._
 import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.core.memory.DataInputViewStreamWrapper
-import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
+import org.junit.Assert.{assertEquals, assertNotNull, assertTrue}
 import org.junit.Test
 
 /**
@@ -48,8 +48,11 @@ class TupleSerializerCompatibilityTest {
 
       assertEquals(1, deserialized.size)
 
-      val oldSerializer = deserialized.get(0).f0
-      val oldConfigSnapshot = deserialized.get(0).f1
+      val oldSerializer: TypeSerializer[TestCaseClass] =
+        deserialized.get(0).f0.asInstanceOf[TypeSerializer[TestCaseClass]]
+
+      val oldConfigSnapshot: TypeSerializerSnapshot[TestCaseClass] =
+        deserialized.get(0).f1.asInstanceOf[TypeSerializerSnapshot[TestCaseClass]]
 
       // test serializer and config snapshot
       assertNotNull(oldSerializer)
@@ -61,9 +64,9 @@ class TupleSerializerCompatibilityTest {
 
       val currentSerializer = createTypeInformation[TestCaseClass]
         .createSerializer(new ExecutionConfig())
-      assertFalse(currentSerializer
-        .ensureCompatibility(oldConfigSnapshot)
-        .isRequiresMigration)
+      assertTrue(oldConfigSnapshot
+        .resolveSchemaCompatibility(currentSerializer)
+        .isCompatibleAsIs)
 
       // test old data serialization
       is.close()
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
index 55e2419a97a..f400dc33612 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.typeutils
 import java.io._
 import java.net.{URL, URLClassLoader}
 
-import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializerSnapshotSerializationUtil}
+import org.apache.flink.api.common.typeutils.{TypeSerializerSchemaCompatibility, TypeSerializerSnapshotSerializationUtil}
 import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
 import org.apache.flink.util.TestLogger
 import org.junit.rules.TemporaryFolder
@@ -84,7 +84,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
     */
   @Test
   def checkIdenticalEnums(): Unit = {
-    assertFalse(checkCompatibility(enumA, enumA).isRequiresMigration)
+    assertTrue(checkCompatibility(enumA, enumA).isCompatibleAsIs)
   }
 
   /**
@@ -92,7 +92,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
     */
   @Test
   def checkAppendedField(): Unit = {
-    assertFalse(checkCompatibility(enumA, enumB).isRequiresMigration)
+    assertTrue(checkCompatibility(enumA, enumB).isCompatibleAsIs)
   }
 
   /**
@@ -100,7 +100,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
     */
   @Test
   def checkRemovedField(): Unit = {
-    assertTrue(checkCompatibility(enumA, enumC).isRequiresMigration)
+    assertTrue(checkCompatibility(enumA, enumC).isIncompatible)
   }
 
   /**
@@ -108,7 +108,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
     */
   @Test
   def checkDifferentFieldOrder(): Unit = {
-    assertTrue(checkCompatibility(enumA, enumD).isRequiresMigration)
+    assertTrue(checkCompatibility(enumA, enumD).isIncompatible)
   }
 
   /**
@@ -117,12 +117,12 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
   @Test
   def checkDifferentIds(): Unit = {
     assertTrue(
-      "Different ids should cause a migration.",
-      checkCompatibility(enumA, enumE).isRequiresMigration)
+      "Different ids should be incompatible.",
+      checkCompatibility(enumA, enumE).isIncompatible)
   }
 
   def checkCompatibility(enumSourceA: String, enumSourceB: String)
-    : CompatibilityResult[Enumeration#Value] = {
+    : TypeSerializerSchemaCompatibility[Enumeration#Value, _] = {
     import EnumValueSerializerUpgradeTest._
 
     val classLoader = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceA)
@@ -152,7 +152,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
     val enum2 = instantiateEnum[Enumeration](classLoader2, enumName)
 
     val enumValueSerializer2 = new EnumValueSerializer(enum2)
-    enumValueSerializer2.ensureCompatibility(snapshot2)
+    snapshot2.resolveSchemaCompatibility(enumValueSerializer2)
   }
 }
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a4b4ce80dc2..885d582bbc8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -27,10 +27,9 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
@@ -724,22 +723,15 @@ private void restoreKVStateMetaData() throws IOException, StateMigrationExceptio
 			// that the new serializer for states could be compatible, and therefore the restore can continue
 			// without old serializers required to be present.
 			KeyedBackendSerializationProxy<K> serializationProxy =
-				new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader, false);
+				new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
 
 			serializationProxy.read(currentStateHandleInView);
 
 			// check for key serializer compatibility; this also reconfigures the
 			// key serializer to be compatible, if it is required and is possible
-			if (CompatibilityUtil.resolveCompatibilityResult(
-				serializationProxy.restoreKeySerializer(),
-				UnloadableDummyTypeSerializer.class,
-				serializationProxy.getKeySerializerConfigSnapshot(),
-				rocksDBKeyedStateBackend.keySerializer)
-				.isRequiresMigration()) {
-
-				// TODO replace with state migration; note that key hash codes need to remain the same after migration
-				throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
-					"Aborting now since state migration is currently not available");
+			if (!serializationProxy.getKeySerializerConfigSnapshot()
+					.resolveSchemaCompatibility(rocksDBKeyedStateBackend.keySerializer).isCompatibleAsIs()) {
+				throw new StateMigrationException("The new key serializer must be compatible.");
 			}
 
 			this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ?
@@ -761,15 +753,12 @@ private void restoreKVStateMetaData() throws IOException, StateMigrationExceptio
 						nameBytes,
 						rocksDBKeyedStateBackend.columnOptions);
 
-					RegisteredStateMetaInfoBase stateMetaInfo =
-						RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo);
-
 					rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
 
 					ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
 
-					registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo);
-					rocksDBKeyedStateBackend.registerKvStateInformation(stateMetaInfo.getName(), registeredColumn);
+					registeredColumn = new Tuple2<>(columnFamily, null);
+					rocksDBKeyedStateBackend.kvStateInformation.put(restoredMetaInfo.getName(), registeredColumn);
 
 				} else {
 					// TODO with eager state registration in place, check here for serializer migration strategies
@@ -1079,13 +1068,10 @@ private ColumnFamilyHandle getOrRegisterColumnFamilyHandle(
 				stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
 
 			if (null == registeredStateMetaInfoEntry) {
-				RegisteredStateMetaInfoBase stateMetaInfo =
-					RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
-
 				registeredStateMetaInfoEntry =
 					new Tuple2<>(
 						columnFamilyHandle != null ? columnFamilyHandle : stateBackend.db.createColumnFamily(columnFamilyDescriptor),
-						stateMetaInfo);
+						null);
 
 				stateBackend.registerKvStateInformation(
 					stateMetaInfoSnapshot.getName(),
@@ -1213,12 +1199,10 @@ private void restoreLocalStateIntoFullInstance(
 				StateMetaInfoSnapshot stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
 
 				ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
-				RegisteredStateMetaInfoBase stateMetaInfo =
-					RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
 
 				stateBackend.registerKvStateInformation(
 					stateMetaInfoSnapshot.getName(),
-					new Tuple2<>(columnFamilyHandle, stateMetaInfo));
+					new Tuple2<>(columnFamilyHandle, null));
 			}
 
 			// use the restore sst files as the base for succeeding checkpoints
@@ -1275,22 +1259,15 @@ private void restoreInstanceDirectoryFromPath(Path source) throws IOException {
 				// that the new serializer for states could be compatible, and therefore the restore can continue
 				// without old serializers required to be present.
 				KeyedBackendSerializationProxy<T> serializationProxy =
-					new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader, false);
+					new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
 				DataInputView in = new DataInputViewStreamWrapper(inputStream);
 				serializationProxy.read(in);
 
 				// check for key serializer compatibility; this also reconfigures the
 				// key serializer to be compatible, if it is required and is possible
-				if (CompatibilityUtil.resolveCompatibilityResult(
-					serializationProxy.restoreKeySerializer(),
-					UnloadableDummyTypeSerializer.class,
-					serializationProxy.getKeySerializerConfigSnapshot(),
-					stateBackend.keySerializer)
-					.isRequiresMigration()) {
-
-					// TODO replace with state migration; note that key hash codes need to remain the same after migration
-					throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
-						"Aborting now since state migration is currently not available");
+				if (!serializationProxy.getKeySerializerConfigSnapshot()
+						.resolveSchemaCompatibility(stateBackend.keySerializer).isCompatibleAsIs()) {
+					throw new StateMigrationException("The new key serializer must be compatible.");
 				}
 
 				return serializationProxy.getStateMetaInfoSnapshots();
@@ -1622,13 +1599,15 @@ public static RocksIteratorWrapper getRocksIterator(
 			TypeSerializer<?> metaInfoTypeSerializer = restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey);
 
 			if (metaInfoTypeSerializer != byteOrderedElementSerializer) {
-				CompatibilityResult<T> compatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
-					metaInfoTypeSerializer,
-					null,
-					restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey),
-					byteOrderedElementSerializer);
+				@SuppressWarnings("unchecked")
+				TypeSerializerSnapshot<T> serializerSnapshot = Preconditions.checkNotNull(
+					(TypeSerializerSnapshot<T>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey));
+
+				TypeSerializerSchemaCompatibility<T, ?> compatibilityResult =
+					serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer);
 
-				if (compatibilityResult.isRequiresMigration()) {
+				// TODO implement proper migration for priority queue state
+				if (compatibilityResult.isIncompatible()) {
 					throw new FlinkRuntimeException(StateMigrationException.notSupported());
 				}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
index 872a58d2304..f813039ae35 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
@@ -290,12 +290,12 @@ public int getCurrentVersion() {
 		}
 
 		@Override
-		public void write(DataOutputView out) throws IOException {
+		public void writeSnapshot(DataOutputView out) throws IOException {
 			out.writeUTF(configPayload);
 		}
 
 		@Override
-		public void read(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
 			if (readVersion != 1) {
 				throw new IllegalStateException("Can not recognize read version: " + readVersion);
 			}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services