You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/31 05:58:57 UTC

[GitHub] tzulitai closed pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

tzulitai closed pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 161e65b3b06..cb6c254f904 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -167,9 +167,9 @@ public boolean canEqual(Object obj) {
 	}
 
 	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof WritableSerializerConfigSnapshot
-				&& typeClass.equals(((WritableSerializerConfigSnapshot) configSnapshot).getTypeClass())) {
+				&& typeClass.equals(((WritableSerializerConfigSnapshot<?>) configSnapshot).getTypeClass())) {
 
 			return CompatibilityResult.compatible();
 		} else {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java
new file mode 100644
index 00000000000..49d03db9830
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards compatibility purposes.
+ *
+ * <p>In older versions of Flink (<= 1.2), we only wrote serializers and not their corresponding snapshots.
+ * This class serves as a wrapper around the restored serializer instances.
+ *
+ * @param <T> the data type that the wrapped serializer instance serializes.
+ */
+@Internal
+public class BackwardsCompatibleSerializerSnapshot<T> implements TypeSerializerSnapshot<T> {
+
+	/**
+	 * The serializer instance written in savepoints.
+	 */
+	@Nonnull
+	private TypeSerializer<T> serializerInstance;
+
+	public BackwardsCompatibleSerializerSnapshot(TypeSerializer<T> serializerInstance) {
+		this.serializerInstance = Preconditions.checkNotNull(serializerInstance);
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		throw new UnsupportedOperationException(
+			"This is a dummy config snapshot used only for backwards compatibility.");
+	}
+
+	@Override
+	public void read(int version, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		throw new UnsupportedOperationException(
+			"This is a dummy config snapshot used only for backwards compatibility.");
+	}
+
+	@Override
+	public int getCurrentVersion() {
+		throw new UnsupportedOperationException(
+			"This is a dummy config snapshot used only for backwards compatibility.");
+	}
+
+	@Override
+	public TypeSerializer<T> restoreSerializer() {
+		return serializerInstance;
+	}
+
+	@Override
+	public <NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(NS newSerializer) {
+		// if there is no configuration snapshot to check against,
+		// then we can only assume that the new serializer is compatible as is
+		return TypeSerializerSchemaCompatibility.compatibleAsIs();
+	}
+
+	@Override
+	public int hashCode() {
+		return serializerInstance.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		BackwardsCompatibleSerializerSnapshot<?> that = (BackwardsCompatibleSerializerSnapshot<?>) o;
+
+		return that.serializerInstance.equals(serializerInstance);
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
index 6c8583c1968..a188a4d23db 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
@@ -23,8 +23,13 @@
 
 /**
  * Utilities related to serializer compatibility.
+ *
+ * @deprecated this utility class still uses the old serializer compatibility interfaces, and
+ *             is therefore deprecated. See {@link TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)}
+ *             and {@link TypeSerializerSchemaCompatibility}.
  */
 @Internal
+@Deprecated
 public class CompatibilityUtil {
 
 	/**
@@ -57,10 +62,12 @@
 	public static <T> CompatibilityResult<T> resolveCompatibilityResult(
 			@Nullable TypeSerializer<?> precedingSerializer,
 			Class<?> dummySerializerClassTag,
-			TypeSerializerConfigSnapshot precedingSerializerConfigSnapshot,
+			TypeSerializerSnapshot<?> precedingSerializerConfigSnapshot,
 			TypeSerializer<T> newSerializer) {
 
-		if (precedingSerializerConfigSnapshot != null) {
+		if (precedingSerializerConfigSnapshot != null
+			&& !(precedingSerializerConfigSnapshot instanceof BackwardsCompatibleSerializerSnapshot)) {
+
 			CompatibilityResult<T> initialResult = newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot);
 
 			if (!initialResult.isRequiresMigration()) {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
index 45b78c1a1f0..237c7ce355e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
@@ -38,9 +38,9 @@
  * is required.
  */
 @Internal
-public abstract class CompositeTypeSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
+public abstract class CompositeTypeSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot<T> {
 
-	private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nestedSerializersAndConfigs;
+	private List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs;
 
 	/** This empty nullary constructor is required for deserializing the configuration. */
 	public CompositeTypeSerializerConfigSnapshot() {}
@@ -50,11 +50,9 @@ public CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>... nestedSerializ
 
 		this.nestedSerializersAndConfigs = new ArrayList<>(nestedSerializers.length);
 		for (TypeSerializer<?> nestedSerializer : nestedSerializers) {
-			TypeSerializerConfigSnapshot configSnapshot = nestedSerializer.snapshotConfiguration();
+			TypeSerializerSnapshot<?> configSnapshot = nestedSerializer.snapshotConfiguration();
 			this.nestedSerializersAndConfigs.add(
-				new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
-					nestedSerializer.duplicate(),
-					Preconditions.checkNotNull(configSnapshot)));
+				new Tuple2<>(nestedSerializer.duplicate(), Preconditions.checkNotNull(configSnapshot)));
 		}
 	}
 
@@ -71,11 +69,11 @@ public void read(DataInputView in) throws IOException {
 			TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, getUserCodeClassLoader());
 	}
 
-	public List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getNestedSerializersAndConfigs() {
+	public List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> getNestedSerializersAndConfigs() {
 		return nestedSerializersAndConfigs;
 	}
 
-	public Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> getSingleNestedSerializerAndConfig() {
+	public Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> getSingleNestedSerializerAndConfig() {
 		return nestedSerializersAndConfigs.get(0);
 	}
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerConfigSnapshot.java
index 4edfe123241..ae95bfdda0c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerConfigSnapshot.java
@@ -31,7 +31,7 @@
  * @param <T> The type to be instantiated.
  */
 @Internal
-public abstract class GenericTypeSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot {
+public abstract class GenericTypeSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot<T> {
 
 	private Class<T> typeClass;
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
index 7ba7dd452d6..6fc6d172491 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
@@ -29,7 +29,7 @@
  * A base class for {@link TypeSerializerConfigSnapshot}s that do not have any parameters.
  */
 @Internal
-public final class ParameterlessTypeSerializerConfig extends TypeSerializerConfigSnapshot {
+public final class ParameterlessTypeSerializerConfig<T> extends TypeSerializerConfigSnapshot<T> {
 
 	private static final int VERSION = 1;
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
index fb59602c5f5..72b25146cf2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
@@ -130,12 +130,12 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
 			"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
 	}
 
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+	public TypeSerializerConfigSnapshot<T> snapshotConfiguration() {
 		throw new UnsupportedOperationException(
 			"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
 	}
 
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		throw new UnsupportedOperationException(
 			"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index a606a181d1d..7a1675eafba 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -175,18 +176,18 @@
 	 * serializer was registered to, the returned configuration snapshot can be used to ensure compatibility
 	 * of the new serializer and determine if state migration is required.
 	 *
-	 * @see TypeSerializerConfigSnapshot
+	 * @see TypeSerializerSnapshot
 	 *
 	 * @return snapshot of the serializer's current configuration (cannot be {@code null}).
 	 */
-	public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
+	public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
 
 	/**
 	 * Ensure compatibility of this serializer with a preceding serializer that was registered for serialization of
 	 * the same managed state (if any - this method is only relevant if this serializer is registered for
 	 * serialization of managed state).
 	 *
-	 * The compatibility check in this method should be performed by inspecting the preceding serializer's configuration
+	 * <p>The compatibility check in this method should be performed by inspecting the preceding serializer's configuration
 	 * snapshot. The method may reconfigure the serializer (if required and possible) so that it may be compatible,
 	 * or provide a signaling result that informs Flink that state migration is necessary before continuing to use
 	 * this serializer.
@@ -215,5 +216,32 @@
 	 *
 	 * @return the determined compatibility result (cannot be {@code null}).
 	 */
-	public abstract CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
+	@Deprecated
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
+		throw new IllegalStateException(
+			"Seems like that you are still using TypeSerializerConfigSnapshot; if so, this method must be implemented. " +
+				"Once you change to directly use TypeSerializerSnapshot, then you can safely remove the implementation " +
+				"of this method.");
+	}
+
+	@Internal
+	public final CompatibilityResult<T> ensureCompatibility(TypeSerializerSnapshot<?> configSnapshot) {
+		if (configSnapshot instanceof TypeSerializerConfigSnapshot) {
+			return ensureCompatibility((TypeSerializerConfigSnapshot<?>) configSnapshot);
+		} else {
+			@SuppressWarnings("unchecked")
+			TypeSerializerSnapshot<T> casted = (TypeSerializerSnapshot<T>) configSnapshot;
+
+			TypeSerializerSchemaCompatibility<T, ? extends TypeSerializer<T>> compat = casted.resolveSchemaCompatibility(this);
+			if (compat.isCompatibleAsIs()) {
+				return CompatibilityResult.compatible();
+			} else if (compat.isCompatibleAfterMigration()) {
+				return CompatibilityResult.requiresMigration();
+			} else if (compat.isIncompatible()) {
+				throw new IllegalStateException("The new serializer is incompatible.");
+			} else {
+				throw new IllegalStateException("Unidentifiable schema compatibility type. This is a bug, please file a JIRA.");
+			}
+		}
+	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
index 389d141fff0..236b994b295 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
@@ -21,16 +21,40 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+
 /**
  * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted along with checkpoints of the managed state that the
- * serializer is registered to.
+ * The configuration snapshot of a serializer is persisted within checkpoints
+ * as a single source of meta information about the schema of serialized data in the checkpoint.
+ * This serves three purposes:
+ *
+ * <ul>
+ *   <li><strong>Capturing serializer parameters and schema:</strong> a serializer's configuration snapshot
+ *   represents information about the parameters, state, and schema of a serializer.
+ *   This is explained in more detail below.</li>
+ *
+ *   <li><strong>Compatibility checks for new serializers:</strong> when new serializers are available,
+ *   they need to be checked whether or not they are compatible to read the data written by the previous serializer.
+ *   This is performed by providing the new serializer to the corresponding serializer configuration
+ *   snapshots in checkpoints.</li>
  *
- * <p>The persisted configuration may later on be used by new serializers to ensure serialization compatibility
- * for the same managed state. In order for new serializers to be able to ensure this, the configuration snapshot
- * should encode sufficient information about:
+ *   <li><strong>Factory for a read serializer when schema conversion is required:<strong> in the case that new
+ *   serializers are not compatible to read previous data, a schema conversion process executed across all data
+ *   is required before the new serializer can be continued to be used. This conversion process requires a compatible
+ *   read serializer to restore serialized bytes as objects, and then written back again using the new serializer.
+ *   In this scenario, the serializer configuration snapshots in checkpoints doubles as a factory for the read
+ *   serializer of the conversion process.</li>
+ * </ul>
+ *
+ * <h2>Serializer Configuration and Schema</h2>
+ *
+ * <p>Since serializer configuration snapshots needs to be used to ensure serialization compatibility
+ * for the same managed state as well as serving as a factory for compatible read serializers, the configuration
+ * snapshot should encode sufficient information about:
  *
  * <ul>
  *   <li><strong>Parameter settings of the serializer:</strong> parameters of the serializer include settings
@@ -38,18 +62,55 @@
  *   has nested serializers, then the configuration snapshot should also contain the parameters of the nested
  *   serializers.</li>
  *
- *   <li><strong>Serialization schema of the serializer:</strong> the data format used by the serializer.</li>
+ *   <li><strong>Serialization schema of the serializer:</strong> the binary format used by the serializer, or
+ *   in other words, the schema of data written by the serializer.</li>
  * </ul>
  *
  * <p>NOTE: Implementations must contain the default empty nullary constructor. This is required to be able to
  * deserialize the configuration snapshot from its binary form.
+ *
+ * @param <T> The data type that the originating serializer of this configuration serializes.
+ *
+ * @deprecated This class has been deprecated since Flink 1.7, and will eventually be removed.
+ *             Please refer to, and directly implement a {@link TypeSerializerSnapshot} instead.
+ *             Class-level Javadocs of {@link TypeSerializerSnapshot} provides more details
+ *             on migrating to the new interface.
  */
 @PublicEvolving
-public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable {
+@Deprecated
+public abstract class TypeSerializerConfigSnapshot<T> extends VersionedIOReadableWritable implements TypeSerializerSnapshot<T> {
 
 	/** The user code class loader; only relevant if this configuration instance was deserialized from binary form. */
 	private ClassLoader userCodeClassLoader;
 
+	/**
+	 * The originating serializer of this configuration snapshot.
+	 */
+	private TypeSerializer<T> serializer;
+
+	/**
+	 * Creates a serializer using this configuration, that is capable of reading data
+	 * written by the serializer described by this configuration.
+	 *
+	 * @return the restored serializer.
+	 */
+	public TypeSerializer<T> restoreSerializer() {
+		if (serializer != null) {
+			return this.serializer;
+		} else {
+			throw new IllegalStateException("Trying to restore the prior serializer via TypeSerializerConfigSnapshot, " +
+				"but the prior serializer has not been set.");
+		}
+	}
+
+	/**
+	 * Set the originating serializer of this configuration snapshot.
+	 */
+	@Internal
+	public void setPriorSerializer(TypeSerializer<T> serializer) {
+		this.serializer = Preconditions.checkNotNull(serializer);
+	}
+
 	/**
 	 * Set the user code class loader.
 	 * Only relevant if this configuration instance was deserialized from binary form.
@@ -77,4 +138,23 @@ public final ClassLoader getUserCodeClassLoader() {
 	public abstract boolean equals(Object obj);
 
 	public abstract int hashCode();
+
+	// ----------------------------------------------------------------------------
+	//  Irrelevant methods; these methods should only ever be used when the new interface is directly implemented.
+	// ----------------------------------------------------------------------------
+
+	@Override
+	public int getCurrentVersion() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public final void read(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public final <NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(NS newSerializer) {
+		throw new UnsupportedOperationException();
+	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
new file mode 100644
index 00000000000..6555f9b2058
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@code TypeSerializerSchemaCompatibility} represents information about whether or not a {@link TypeSerializer}
+ * can be safely used to read data written by a previous type serializer.
+ *
+ * <p>Typically, the compatibility of the new serializer is resolved by checking it against the snapshotted
+ * {@link TypeSerializerConfigSnapshot} of the previous serializer. Depending on the type of the
+ * resolved compatibility result, migration (i.e., reading bytes with the previous serializer and then writing
+ * it again with the new serializer) may be required before the new serializer can be used.
+ *
+ * @param <T> the type of data serialized by the serializer that was being checked.
+ *
+ * @param <NS> the type of serializer that was being checked.
+ *
+ * @see TypeSerializer
+ * @see TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)
+ */
+@PublicEvolving
+public class TypeSerializerSchemaCompatibility<T, NS extends TypeSerializer<T>> {
+
+	/**
+	 * Enum for the type of the compatibility.
+	 */
+	enum Type {
+
+		/** This indicates that the new serializer continued to be used as is. */
+		COMPATIBLE_AS_IS,
+
+		/**
+		 * This indicates that it is possible to use the new serializer after performing a
+		 * full-scan migration over all state, by reading bytes with the previous serializer
+		 * and then writing it again with the new serializer, effectively converting the
+		 * serialization schema to correspond to the new serializer.
+		 */
+		COMPATIBLE_AFTER_MIGRATION,
+
+		/**
+		 * This indicates that the new serializer is incompatible, even with migration.
+		 * This normally implies that the deserialized Java class can not be commonly recognized
+		 * by the previous and new serializer.
+		 */
+		INCOMPATIBLE
+	}
+
+	/**
+	 * The type of the compatibility.
+	 */
+	private final Type resultType;
+
+	/**
+	 * Returns a result that indicates that the new serializer is compatible and no migration is required.
+	 * The new serializer can continued to be used as is.
+	 *
+	 * @return a result that indicates migration is not required for the new serializer.
+	 */
+	public static <T, NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> compatibleAsIs() {
+		return new TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AS_IS, null);
+	}
+
+	/**
+	 * Returns a result that indicates that the new serializer can be used after migrating the written bytes, i.e.
+	 * reading it with the old serializer and then writing it again with the new serializer.
+	 *
+	 * @return a result that indicates that the new serializer can be used after migrating the written bytes.
+	 */
+	public static <T, NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> compatibleAfterMigration() {
+		return new TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AFTER_MIGRATION, null);
+	}
+
+	/**
+	 * Returns a result that indicates there is no possible way for the new serializer to be use-able.
+	 * This normally indicates that there is no common Java class between what the previous bytes can be
+	 * deserialized into and what can be written by the new serializer.
+	 *
+	 * <p>In this case, there is no possible way for the new serializer to continue to be used, even with
+	 * migration. Recovery of the Flink job will fail.
+	 *
+	 * @return a result that indicates incompatibility between the new and previous serializer.
+	 */
+	public static <T, NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> incompatible() {
+		return new TypeSerializerSchemaCompatibility<>(Type.INCOMPATIBLE, null);
+	}
+
+	private TypeSerializerSchemaCompatibility(Type resultType, @Nullable NS reconfiguredNewSerializer) {
+		this.resultType = Preconditions.checkNotNull(resultType);
+	}
+
+	/**
+	 * Returns whether or not the type of the compatibility is {@link Type#COMPATIBLE_AS_IS}.
+	 *
+	 * @return whether or not the type of the compatibility is {@link Type#COMPATIBLE_AS_IS}.
+	 */
+	public boolean isCompatibleAsIs() {
+		return resultType == Type.COMPATIBLE_AS_IS;
+	}
+
+	/**
+	 * Returns whether or not the type of the compatibility is {@link Type#COMPATIBLE_AFTER_MIGRATION}.
+	 *
+	 * @return whether or not the type of the compatibility is {@link Type#COMPATIBLE_AFTER_MIGRATION}.
+	 */
+	public boolean isCompatibleAfterMigration() {
+		return resultType == Type.COMPATIBLE_AFTER_MIGRATION;
+	}
+
+	/**
+	 * Returns whether or not the type of the compatibility is {@link Type#INCOMPATIBLE}.
+	 *
+	 * @return whether or not the type of the compatibility is {@link Type#INCOMPATIBLE}.
+	 */
+	public boolean isIncompatible() {
+		return resultType == Type.INCOMPATIBLE;
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
index e83b8c71ec3..e7cdc382f60 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
@@ -40,9 +40,14 @@
 import java.util.List;
 
 /**
- * Utility methods for serialization of {@link TypeSerializer} and {@link TypeSerializerConfigSnapshot}.
+ * Utility methods for serialization of {@link TypeSerializer}.
+ *
+ * @deprecated This utility class was used to write serializers into checkpoints.
+ *             Starting from Flink 1.6.x, this should no longer happen, and therefore
+ *             this class is deprecated. It remains here for backwards compatibility paths.
  */
 @Internal
+@Deprecated
 public class TypeSerializerSerializationUtil {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TypeSerializerSerializationUtil.class);
@@ -142,19 +147,20 @@
 	 */
 	public static void writeSerializersAndConfigsWithResilience(
 			DataOutputView out,
-			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs) throws IOException {
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigs) throws IOException {
 
 		try (
 			ByteArrayOutputStreamWithPos bufferWithPos = new ByteArrayOutputStreamWithPos();
 			DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(bufferWithPos)) {
 
 			out.writeInt(serializersAndConfigs.size());
-			for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> serAndConfSnapshot : serializersAndConfigs) {
+			for (Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> serAndConfSnapshot : serializersAndConfigs) {
 				out.writeInt(bufferWithPos.getPosition());
 				writeSerializer(bufferWrapper, serAndConfSnapshot.f0);
 
 				out.writeInt(bufferWithPos.getPosition());
-				writeSerializerConfigSnapshot(bufferWrapper, serAndConfSnapshot.f1);
+				TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+					bufferWrapper, (TypeSerializerSnapshot) serAndConfSnapshot.f1, serAndConfSnapshot.f0);
 			}
 
 			out.writeInt(bufferWithPos.getPosition());
@@ -176,7 +182,7 @@ public static void writeSerializersAndConfigsWithResilience(
 	 *
 	 * @throws IOException
 	 */
-	public static List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> readSerializersAndConfigsWithResilience(
+	public static List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> readSerializersAndConfigsWithResilience(
 			DataInputView in,
 			ClassLoader userCodeClassLoader) throws IOException {
 
@@ -193,11 +199,11 @@ public static void writeSerializersAndConfigsWithResilience(
 		byte[] buffer = new byte[totalBytes];
 		in.readFully(buffer);
 
-		List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigSnapshots =
+		List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigSnapshots =
 			new ArrayList<>(numSerializersAndConfigSnapshots);
 
 		TypeSerializer<?> serializer;
-		TypeSerializerConfigSnapshot configSnapshot;
+		TypeSerializerSnapshot<?> configSnapshot;
 		try (
 			ByteArrayInputStreamWithPos bufferWithPos = new ByteArrayInputStreamWithPos(buffer);
 			DataInputViewStreamWrapper bufferWrapper = new DataInputViewStreamWrapper(bufferWithPos)) {
@@ -208,105 +214,17 @@ public static void writeSerializersAndConfigsWithResilience(
 				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true);
 
 				bufferWithPos.setPosition(offsets[i * 2 + 1]);
-				configSnapshot = readSerializerConfigSnapshot(bufferWrapper, userCodeClassLoader);
 
-				serializersAndConfigSnapshots.add(
-					new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(serializer, configSnapshot));
+				configSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+						bufferWrapper, userCodeClassLoader, serializer);
+
+				serializersAndConfigSnapshots.add(new Tuple2<>(serializer, configSnapshot));
 			}
 		}
 
 		return serializersAndConfigSnapshots;
 	}
 
-	/**
-	 * Writes a {@link TypeSerializerConfigSnapshot} to the provided data output view.
-	 *
-	 * <p>It is written with a format that can be later read again using
-	 * {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}.
-	 *
-	 * @param out the data output view
-	 * @param serializerConfigSnapshot the serializer configuration snapshot to write
-	 *
-	 * @throws IOException
-	 */
-	public static void writeSerializerConfigSnapshot(
-			DataOutputView out,
-			TypeSerializerConfigSnapshot serializerConfigSnapshot) throws IOException {
-
-		new TypeSerializerConfigSnapshotSerializationProxy(serializerConfigSnapshot).write(out);
-	}
-
-	/**
-	 * Reads from a data input view a {@link TypeSerializerConfigSnapshot} that was previously
-	 * written using {@link #writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot)}.
-	 *
-	 * @param in the data input view
-	 * @param userCodeClassLoader the user code class loader to use
-	 *
-	 * @return the read serializer configuration snapshot
-	 *
-	 * @throws IOException
-	 */
-	public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
-			DataInputView in,
-			ClassLoader userCodeClassLoader) throws IOException {
-
-		final TypeSerializerConfigSnapshotSerializationProxy proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
-		proxy.read(in);
-
-		return proxy.getSerializerConfigSnapshot();
-	}
-
-	/**
-	 * Writes multiple {@link TypeSerializerConfigSnapshot}s to the provided data output view.
-	 *
-	 * <p>It is written with a format that can be later read again using
-	 * {@link #readSerializerConfigSnapshots(DataInputView, ClassLoader)}.
-	 *
-	 * @param out the data output view
-	 * @param serializerConfigSnapshots the serializer configuration snapshots to write
-	 *
-	 * @throws IOException
-	 */
-	public static void writeSerializerConfigSnapshots(
-			DataOutputView out,
-			TypeSerializerConfigSnapshot... serializerConfigSnapshots) throws IOException {
-
-		out.writeInt(serializerConfigSnapshots.length);
-
-		for (TypeSerializerConfigSnapshot snapshot : serializerConfigSnapshots) {
-			new TypeSerializerConfigSnapshotSerializationProxy(snapshot).write(out);
-		}
-	}
-
-	/**
-	 * Reads from a data input view multiple {@link TypeSerializerConfigSnapshot}s that was previously
-	 * written using {@link #writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot)}.
-	 *
-	 * @param in the data input view
-	 * @param userCodeClassLoader the user code class loader to use
-	 *
-	 * @return the read serializer configuration snapshots
-	 *
-	 * @throws IOException
-	 */
-	public static TypeSerializerConfigSnapshot[] readSerializerConfigSnapshots(
-			DataInputView in,
-			ClassLoader userCodeClassLoader) throws IOException {
-
-		int numFields = in.readInt();
-		final TypeSerializerConfigSnapshot[] serializerConfigSnapshots = new TypeSerializerConfigSnapshot[numFields];
-
-		TypeSerializerConfigSnapshotSerializationProxy proxy;
-		for (int i = 0; i < numFields; i++) {
-			proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
-			proxy.read(in);
-			serializerConfigSnapshots[i] = proxy.getSerializerConfigSnapshot();
-		}
-
-		return serializerConfigSnapshots;
-	}
-
 	// -----------------------------------------------------------------------------------------------------
 
 	/**
@@ -383,65 +301,4 @@ public int getVersion() {
 			return VERSION;
 		}
 	}
-
-	/**
-	 * Utility serialization proxy for a {@link TypeSerializerConfigSnapshot}.
-	 */
-	static final class TypeSerializerConfigSnapshotSerializationProxy extends VersionedIOReadableWritable {
-
-		private static final int VERSION = 1;
-
-		private ClassLoader userCodeClassLoader;
-		private TypeSerializerConfigSnapshot serializerConfigSnapshot;
-
-		TypeSerializerConfigSnapshotSerializationProxy(ClassLoader userCodeClassLoader) {
-			this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
-		}
-
-		TypeSerializerConfigSnapshotSerializationProxy(TypeSerializerConfigSnapshot serializerConfigSnapshot) {
-			this.serializerConfigSnapshot = serializerConfigSnapshot;
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			super.write(out);
-
-			// config snapshot class, so that we can re-instantiate the
-			// correct type of config snapshot instance when deserializing
-			out.writeUTF(serializerConfigSnapshot.getClass().getName());
-
-			// the actual configuration parameters
-			serializerConfigSnapshot.write(out);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public void read(DataInputView in) throws IOException {
-			super.read(in);
-
-			String serializerConfigClassname = in.readUTF();
-			Class<? extends TypeSerializerConfigSnapshot> serializerConfigSnapshotClass;
-			try {
-				serializerConfigSnapshotClass = (Class<? extends TypeSerializerConfigSnapshot>)
-					Class.forName(serializerConfigClassname, true, userCodeClassLoader);
-			} catch (ClassNotFoundException e) {
-				throw new IOException(
-					"Could not find requested TypeSerializerConfigSnapshot class "
-						+ serializerConfigClassname +  " in classpath.", e);
-			}
-
-			serializerConfigSnapshot = InstantiationUtil.instantiate(serializerConfigSnapshotClass);
-			serializerConfigSnapshot.setUserCodeClassLoader(userCodeClassLoader);
-			serializerConfigSnapshot.read(in);
-		}
-
-		@Override
-		public int getVersion() {
-			return VERSION;
-		}
-
-		TypeSerializerConfigSnapshot getSerializerConfigSnapshot() {
-			return serializerConfigSnapshot;
-		}
-	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
new file mode 100644
index 00000000000..067d43d480d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * A {@code TypeSerializerSnapshot} is a point-in-time view of a {@link TypeSerializer's} configuration.
+ * The configuration snapshot of a serializer is persisted within checkpoints
+ * as a single source of meta information about the schema of serialized data in the checkpoint.
+ * This serves three purposes:
+ *
+ * <ul>
+ *   <li><strong>Capturing serializer parameters and schema:</strong> a serializer's configuration snapshot
+ *   represents information about the parameters, state, and schema of a serializer.
+ *   This is explained in more detail below.</li>
+ *
+ *   <li><strong>Compatibility checks for new serializers:</strong> when new serializers are available,
+ *   they need to be checked whether or not they are compatible to read the data written by the previous serializer.
+ *   This is performed by providing the new serializer to the correspondibng serializer configuration
+ *   snapshots in checkpoints.</li>
+ *
+ *   <li><strong>Factory for a read serializer when schema conversion is required:<strong> in the case that new
+ *   serializers are not compatible to read previous data, a schema conversion process executed across all data
+ *   is required before the new serializer can be continued to be used. This conversion process requires a compatible
+ *   read serializer to restore serialized bytes as objects, and then written back again using the new serializer.
+ *   In this scenario, the serializer configuration snapshots in checkpoints doubles as a factory for the read
+ *   serializer of the conversion process.</li>
+ * </ul>
+ *
+ * <h2>Serializer Configuration and Schema</h2>
+ *
+ * <p>Since serializer configuration snapshots needs to be used to ensure serialization compatibility
+ * for the same managed state as well as serving as a factory for compatible read serializers, the configuration
+ * snapshot should encode sufficient information about:
+ *
+ * <ul>
+ *   <li><strong>Parameter settings of the serializer:</strong> parameters of the serializer include settings
+ *   required to setup the serializer, or the state of the serializer if it is stateful. If the serializer
+ *   has nested serializers, then the configuration snapshot should also contain the parameters of the nested
+ *   serializers.</li>
+ *
+ *   <li><strong>Serialization schema of the serializer:</strong> the binary format used by the serializer, or
+ *   in other words, the schema of data written by the serializer.</li>
+ * </ul>
+ *
+ * <p>NOTE: Implementations must contain the default empty nullary constructor. This is required to be able to
+ * deserialize the configuration snapshot from its binary form.
+ *
+ * @param <T> The data type that the originating serializer of this configuration serializes.
+ */
+@PublicEvolving
+public interface TypeSerializerSnapshot<T> {
+
+	/**
+	 * Returns the version of the current snapshot's written binary format.
+	 *
+	 * @return the version of the current snapshot's written binary format.
+	 */
+	int getCurrentVersion();
+
+	/**
+	 * Writes the serializer snapshot to the provided {@link DataOutputView}.
+	 * The current version of the written serializer snapshot's binary format
+	 * is specified by the {@link #getCurrentVersion()} method.
+	 *
+	 * @param out the {@link DataOutputView} to write the snapshot to.
+	 *
+	 * @throws IOException
+	 */
+	void write(DataOutputView out) throws IOException;
+
+	/**
+	 * Reads the serializer snapshot from the provided {@link DataInputView}.
+	 * The version of the binary format that the serializer snapshot was written
+	 * with is provided. This version can be used to determine how the serializer
+	 * snapshot should be read.
+	 *
+	 * @param readVersion version of the serializer snapshot's written binary format
+	 * @param in the {@link DataInputView} to read the snapshot from.
+	 * @param userCodeClassLoader the user code classloader
+	 *
+	 * @throws IOException
+	 */
+	void read(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException;
+
+	/**
+	 * Recreates a serializer instance from this snapshot. The returned
+	 * serializer can be safely used to read data written by the prior serializer
+	 * (i.e., the serializer that created this snapshot).
+	 *
+	 * @return a serializer instance restored from this serializer snapshot.
+	 */
+	TypeSerializer<T> restoreSerializer();
+
+	/**
+	 * Checks a new serializer's compatibility to read data written by the prior
+	 * serializer.
+	 *
+	 * @param newSerializer the new serializer to check.
+	 * @param <NS> the type of the new serializer
+	 *
+	 * @return the serializer compatibility result.
+	 */
+	<NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(NS newSerializer);
+
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java
new file mode 100644
index 00000000000..0bcff93b802
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+/**
+ * Utility methods for serialization of {@link TypeSerializerSnapshot}.
+ */
+public class TypeSerializerSnapshotSerializationUtil {
+
+	/**
+	 * Writes a {@link TypeSerializerSnapshot} to the provided data output view.
+	 *
+	 * <p>It is written with a format that can be later read again using
+	 * {@link #readSerializerSnapshot(DataInputView, ClassLoader, TypeSerializer)}.
+	 *
+	 * @param out the data output view
+	 * @param serializerSnapshot the serializer configuration snapshot to write
+	 * @param serializer the prior serializer. This needs to be written of the serializer snapshot
+	 *                   if the serializer snapshot is still the legacy {@link TypeSerializerConfigSnapshot}.
+	 *
+	 * @throws IOException
+	 */
+	public static <T> void writeSerializerSnapshot(
+		DataOutputView out,
+		TypeSerializerSnapshot<T> serializerSnapshot,
+		TypeSerializer<T> serializer) throws IOException {
+
+		new TypeSerializerSnapshotSerializationProxy<>(serializerSnapshot, serializer).write(out);
+	}
+
+	/**
+	 * Reads from a data input view a {@link TypeSerializerSnapshot} that was previously
+	 * written using {@link TypeSerializerSnapshotSerializationUtil#writeSerializerSnapshot(DataOutputView, TypeSerializerSnapshot, TypeSerializer)}.
+	 *
+	 * @param in the data input view
+	 * @param userCodeClassLoader the user code class loader to use
+	 * @param existingPriorSerializer the prior serializer. This would only be non-null if we are
+	 *                                restoring from a snapshot taken with Flink version <= 1.6.
+	 *
+	 * @return the read serializer configuration snapshot
+	 *
+	 * @throws IOException
+	 */
+	public static <T> TypeSerializerSnapshot<T> readSerializerSnapshot(
+			DataInputView in,
+			ClassLoader userCodeClassLoader,
+			@Nullable TypeSerializer<T> existingPriorSerializer) throws IOException {
+
+		final TypeSerializerSnapshotSerializationProxy<T> proxy =
+			new TypeSerializerSnapshotSerializationProxy<>(userCodeClassLoader, existingPriorSerializer);
+		proxy.read(in);
+
+		return proxy.getSerializerSnapshot();
+	}
+
+	/**
+	 * Utility serialization proxy for a {@link TypeSerializerSnapshot}.
+	 */
+	static final class TypeSerializerSnapshotSerializationProxy<T> extends VersionedIOReadableWritable {
+
+		private static final int VERSION = 2;
+
+		private ClassLoader userCodeClassLoader;
+		private TypeSerializerSnapshot<T> serializerSnapshot;
+		private TypeSerializer<T> serializer;
+
+		TypeSerializerSnapshotSerializationProxy(
+			ClassLoader userCodeClassLoader,
+			@Nullable TypeSerializer<T> existingPriorSerializer) {
+			this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+			this.serializer = existingPriorSerializer;
+		}
+
+		TypeSerializerSnapshotSerializationProxy(
+			TypeSerializerSnapshot<T> serializerConfigSnapshot,
+			TypeSerializer<T> serializer) {
+			this.serializerSnapshot = Preconditions.checkNotNull(serializerConfigSnapshot);
+			this.serializer = Preconditions.checkNotNull(serializer);
+		}
+
+		/**
+		 * Binary format layout of a written serializer snapshot is as follows:
+		 *
+		 * <ul>
+		 *     <li>1. Serializer snapshot classname (UTF).</li>
+		 *     <li>2. The originating serializer of the snapshot, if any, written via Java serialization.
+		 *         Presence of the serializer is indicated by a flag (boolean -> TypeSerializer).</li>
+		 *     <li>3. The version of the serializer snapshot's binary format.</li>
+		 *     <li>4. The actual serializer snapshot.</li>
+		 * </ul>
+		 */
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+
+			// config snapshot class, so that we can re-instantiate the
+			// correct type of config snapshot instance when deserializing
+			out.writeUTF(serializerSnapshot.getClass().getName());
+
+			if (serializerSnapshot instanceof TypeSerializerConfigSnapshot) {
+				// backwards compatible path, where the serializer snapshot is still using the
+				// deprecated interface; the originating serializer needs to be written to the byte stream
+				out.writeBoolean(true);
+				@SuppressWarnings("unchecked")
+				TypeSerializerConfigSnapshot<T> legacySerializerSnapshot = (TypeSerializerConfigSnapshot<T>) serializerSnapshot;
+				TypeSerializerSerializationUtil.writeSerializer(out, serializer);
+
+				// TypeSerializerConfigSnapshot includes the version number implicitly when it is written
+				legacySerializerSnapshot.write(out);
+			} else {
+				out.writeBoolean(false);
+
+				out.writeInt(serializerSnapshot.getCurrentVersion());
+				serializerSnapshot.write(out);
+			}
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public void read(DataInputView in) throws IOException {
+			super.read(in);
+
+			String serializerConfigClassname = in.readUTF();
+			Class<? extends TypeSerializerSnapshot> serializerConfigSnapshotClass;
+			try {
+				serializerConfigSnapshotClass = (Class<? extends TypeSerializerSnapshot>)
+					Class.forName(serializerConfigClassname, false, userCodeClassLoader);
+			} catch (ClassNotFoundException e) {
+				throw new IOException(
+					"Could not find requested TypeSerializerConfigSnapshot class "
+						+ serializerConfigClassname +  " in classpath.", e);
+			}
+
+			serializerSnapshot = InstantiationUtil.instantiate(serializerConfigSnapshotClass);
+
+			if (getReadVersion() >= 2) {
+				// Flink version after 1.7
+
+				boolean containsPriorSerializer = in.readBoolean();
+
+				TypeSerializer<T> priorSerializer = (containsPriorSerializer)
+					? TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true)
+					: null;
+
+				if (serializerSnapshot instanceof TypeSerializerConfigSnapshot) {
+					if (priorSerializer != null) {
+						((TypeSerializerConfigSnapshot<T>) serializerSnapshot).setPriorSerializer(priorSerializer);
+						((TypeSerializerConfigSnapshot<T>) serializerSnapshot).setUserCodeClassLoader(userCodeClassLoader);
+						((TypeSerializerConfigSnapshot<T>) serializerSnapshot).read(in);
+					} else {
+						// this occurs if the user changed a TypeSerializerSnapshot to the
+						// legacy TypeSerializerConfigSnapshot, which isn't supported.
+						throw new IOException("Cannot read a legacy TypeSerializerConfigSnapshot without the prior serializer present. ");
+					}
+				} else {
+					int readVersion = in.readInt();
+					serializerSnapshot.read(readVersion, in, userCodeClassLoader);
+				}
+			} else {
+				// Flink version before 1.7.x, and after 1.3.x
+
+				if (serializerSnapshot instanceof TypeSerializerConfigSnapshot) {
+					((TypeSerializerConfigSnapshot<T>) serializerSnapshot).setPriorSerializer(this.serializer);
+					((TypeSerializerConfigSnapshot<T>) serializerSnapshot).setUserCodeClassLoader(userCodeClassLoader);
+					((TypeSerializerConfigSnapshot<T>) serializerSnapshot).read(in);
+				} else {
+					int readVersion = in.readInt();
+					serializerSnapshot.read(readVersion, in, userCodeClassLoader);
+				}
+			}
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		@Override
+		public int[] getCompatibleVersions() {
+			return new int[]{VERSION, 1};
+		}
+
+		TypeSerializerSnapshot<T> getSerializerSnapshot() {
+			return serializerSnapshot;
+		}
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
index ddfeab41c1d..448c53b209e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
@@ -94,12 +94,12 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
 	}
 
 	@Override
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+	public TypeSerializerConfigSnapshot<T> snapshotConfiguration() {
 		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
 	}
 
 	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
 	}
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
index 55729852ed1..d1a3e9508ac 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
@@ -22,13 +22,16 @@
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
+import java.util.Collection;
+
 /**
  * Configuration snapshot of a serializer for collection types.
  *
  * @param <T> Type of the element.
  */
 @Internal
-public final class CollectionSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+public final class CollectionSerializerConfigSnapshot<C extends Collection<T>, T>
+		extends CompositeTypeSerializerConfigSnapshot<C> {
 
 	private static final int VERSION = 1;
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
index c40fefc1694..dcd26734f24 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -178,7 +178,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof EnumSerializerConfigSnapshot) {
 			final EnumSerializerConfigSnapshot<T> config = (EnumSerializerConfigSnapshot<T>) configSnapshot;
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index cdfc964e15d..a42226cd911 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -198,12 +198,12 @@ public String toString() {
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public GenericArraySerializerConfigSnapshot snapshotConfiguration() {
+	public GenericArraySerializerConfigSnapshot<C> snapshotConfiguration() {
 		return new GenericArraySerializerConfigSnapshot<>(componentClass, componentSerializer);
 	}
 
 	@Override
-	public CompatibilityResult<C[]> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<C[]> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof GenericArraySerializerConfigSnapshot) {
 			final GenericArraySerializerConfigSnapshot config = (GenericArraySerializerConfigSnapshot) configSnapshot;
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index 70e52106b48..5b207ddbb25 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -36,7 +36,7 @@
  * @param <C> The component type.
  */
 @Internal
-public final class GenericArraySerializerConfigSnapshot<C> extends CompositeTypeSerializerConfigSnapshot {
+public final class GenericArraySerializerConfigSnapshot<C> extends CompositeTypeSerializerConfigSnapshot<C[]> {
 
 	private static final int VERSION = 1;
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index c2b935c82e1..67de6e0c3bd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
@@ -179,15 +180,15 @@ public int hashCode() {
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public CollectionSerializerConfigSnapshot snapshotConfiguration() {
+	public CollectionSerializerConfigSnapshot<List<T>, T> snapshotConfiguration() {
 		return new CollectionSerializerConfigSnapshot<>(elementSerializer);
 	}
 
 	@Override
 	public CompatibilityResult<List<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
 		if (configSnapshot instanceof CollectionSerializerConfigSnapshot) {
-			Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousElemSerializerAndConfig =
-				((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
+			Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> previousElemSerializerAndConfig =
+				((CollectionSerializerConfigSnapshot<?, ?>) configSnapshot).getSingleNestedSerializerAndConfig();
 
 			CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
 					previousElemSerializerAndConfig.f0,
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index 6471152b0bc..2f2f556ed84 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
@@ -207,15 +208,15 @@ public int hashCode() {
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public MapSerializerConfigSnapshot snapshotConfiguration() {
+	public MapSerializerConfigSnapshot<K, V> snapshotConfiguration() {
 		return new MapSerializerConfigSnapshot<>(keySerializer, valueSerializer);
 	}
 
 	@Override
-	public CompatibilityResult<Map<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<Map<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof MapSerializerConfigSnapshot) {
-			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
-				((MapSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousKvSerializersAndConfigs =
+				((MapSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs();
 
 			CompatibilityResult<K> keyCompatResult = CompatibilityUtil.resolveCompatibilityResult(
 					previousKvSerializersAndConfigs.get(0).f0,
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
index 9db3019ad46..e601d4d52cc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
@@ -22,12 +22,14 @@
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
+import java.util.Map;
+
 /**
  * Configuration snapshot for serializers of maps, containing the
  * configuration snapshot of its key serializer and value serializer.
  */
 @Internal
-public final class MapSerializerConfigSnapshot<K, V> extends CompositeTypeSerializerConfigSnapshot {
+public final class MapSerializerConfigSnapshot<K, V> extends CompositeTypeSerializerConfigSnapshot<Map<K, V>> {
 
 	private static final int VERSION = 1;
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
index 9354af07e18..13e41675859 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -53,16 +53,16 @@ public boolean equals(Object obj) {
 	}
 
 	@Override
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+	public TypeSerializerConfigSnapshot<T> snapshotConfiguration() {
 		// type serializer singletons should always be parameter-less
-		return new ParameterlessTypeSerializerConfig(getSerializationFormatIdentifier());
+		return new ParameterlessTypeSerializerConfig<>(getSerializationFormatIdentifier());
 	}
 
 	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof ParameterlessTypeSerializerConfig
 				&& isCompatibleSerializationFormatIdentifier(
-						((ParameterlessTypeSerializerConfig) configSnapshot).getSerializationFormatIdentifier())) {
+						((ParameterlessTypeSerializerConfig<?>) configSnapshot).getSerializationFormatIdentifier())) {
 
 			return CompatibilityResult.compatible();
 		} else {
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index b9039690f9f..68e4af3ea44 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -142,9 +142,9 @@ public boolean canEqual(Object obj) {
 	}
 
 	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof CopyableValueSerializerConfigSnapshot
-				&& valueClass.equals(((CopyableValueSerializerConfigSnapshot) configSnapshot).getTypeClass())) {
+				&& valueClass.equals(((CopyableValueSerializerConfigSnapshot<?>) configSnapshot).getTypeClass())) {
 			return CompatibilityResult.compatible();
 		} else {
 			return CompatibilityResult.requiresMigration();
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index 18ebcd8b63c..890780ad762 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
@@ -196,15 +197,15 @@ public int hashCode() {
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public EitherSerializerConfigSnapshot snapshotConfiguration() {
+	public EitherSerializerConfigSnapshot<L, R> snapshotConfiguration() {
 		return new EitherSerializerConfigSnapshot<>(leftSerializer, rightSerializer);
 	}
 
 	@Override
-	public CompatibilityResult<Either<L, R>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<Either<L, R>> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof EitherSerializerConfigSnapshot) {
-			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousLeftRightSerializersAndConfigs =
-				((EitherSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousLeftRightSerializersAndConfigs =
+				((EitherSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs();
 
 			CompatibilityResult<L> leftCompatResult = CompatibilityUtil.resolveCompatibilityResult(
 					previousLeftRightSerializersAndConfigs.get(0).f0,
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
index f9968781a8c..628fff41107 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
@@ -28,7 +28,7 @@
  * containing configuration snapshots of the Left and Right serializers.
  */
 @Internal
-public final class EitherSerializerConfigSnapshot<L, R> extends CompositeTypeSerializerConfigSnapshot {
+public final class EitherSerializerConfigSnapshot<L, R> extends CompositeTypeSerializerConfigSnapshot<Either<L, R>> {
 
 	private static final int VERSION = 1;
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index a4d086d6a74..a4d3839f66f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -36,8 +36,10 @@
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -594,7 +596,7 @@ public boolean canEqual(Object obj) {
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof PojoSerializerConfigSnapshot) {
 			final PojoSerializerConfigSnapshot<T> config = (PojoSerializerConfigSnapshot<T>) configSnapshot;
 
@@ -614,7 +616,7 @@ public boolean canEqual(Object obj) {
 						(TypeSerializer<Object>[]) new TypeSerializer<?>[this.numFields];
 
 					int i = 0;
-					for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToConfigSnapshotEntry
+					for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> fieldToConfigSnapshotEntry
 							: config.getFieldToSerializerConfigSnapshot().entrySet()) {
 
 						int fieldIndex = findField(fieldToConfigSnapshotEntry.getKey());
@@ -652,7 +654,7 @@ public boolean canEqual(Object obj) {
 					final LinkedHashMap<Class<?>, Integer> reorderedRegisteredSubclassesToClasstags;
 					final TypeSerializer<?>[] reorderedRegisteredSubclassSerializers;
 
-					final LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousRegistrations =
+					final LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousRegistrations =
 						config.getRegisteredSubclassesToSerializerConfigSnapshots();
 
 					// the reconfigured list of registered subclasses will be the previous registered
@@ -668,7 +670,7 @@ public boolean canEqual(Object obj) {
 						reorderedRegisteredSubclasses, executionConfig);
 
 					i = 0;
-					for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousRegisteredSerializerConfig : previousRegistrations.values()) {
+					for (Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> previousRegisteredSerializerConfig : previousRegistrations.values()) {
 						// check compatibility of subclass serializer
 						compatResult = CompatibilityUtil.resolveCompatibilityResult(
 								previousRegisteredSerializerConfig.f0,
@@ -693,7 +695,7 @@ public boolean canEqual(Object obj) {
 					// this won't be applied to this serializer until all compatibility checks have been completed
 					HashMap<Class<?>, TypeSerializer<?>> rebuiltCache = new HashMap<>();
 
-					for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousCachedEntry
+					for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousCachedEntry
 							: config.getNonRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) {
 
 						TypeSerializer<?> cachedSerializer = createSubclassSerializer(previousCachedEntry.getKey());
@@ -759,7 +761,7 @@ public boolean canEqual(Object obj) {
 		 * may reorder the fields in case they are different. The order of the fields need to
 		 * stay the same for binary compatibility, as the field order is part of the serialization format.
 		 */
-		private LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot;
+		private LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> fieldToSerializerConfigSnapshot;
 
 		/**
 		 * Ordered map of registered subclasses to their corresponding serializers and its configuration snapshots.
@@ -768,7 +770,7 @@ public boolean canEqual(Object obj) {
 		 * may retain the same class tag used for registered subclasses. Newly registered subclasses that
 		 * weren't present before should be appended with the next available class tag.
 		 */
-		private LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots;
+		private LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> registeredSubclassesToSerializerConfigSnapshots;
 
 		/**
 		 * Previously cached non-registered subclass serializers and its configuration snapshots.
@@ -776,7 +778,7 @@ public boolean canEqual(Object obj) {
 		 * <p>This is kept so that new Pojo serializers may eagerly repopulate their
 		 * cache with reconfigured subclass serializers.
 		 */
-		private HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots;
+		private HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nonRegisteredSubclassesToSerializerConfigSnapshots;
 
 		private boolean ignoreTypeSerializerSerialization;
 
@@ -785,9 +787,9 @@ public PojoSerializerConfigSnapshot() {}
 
 		public PojoSerializerConfigSnapshot(
 				Class<T> pojoType,
-				LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
-				LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots,
-				HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots) {
+				LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> fieldToSerializerConfigSnapshot,
+				LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> registeredSubclassesToSerializerConfigSnapshots,
+				HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nonRegisteredSubclassesToSerializerConfigSnapshots) {
 
 			this(
 				pojoType,
@@ -799,9 +801,9 @@ public PojoSerializerConfigSnapshot(
 
 		public PojoSerializerConfigSnapshot(
 				Class<T> pojoType,
-				LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
-				LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots,
-				HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots,
+				LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> fieldToSerializerConfigSnapshot,
+				LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> registeredSubclassesToSerializerConfigSnapshots,
+				HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nonRegisteredSubclassesToSerializerConfigSnapshots,
 				boolean ignoreTypeSerializerSerialization) {
 
 			super(pojoType);
@@ -827,7 +829,7 @@ public void write(DataOutputView out) throws IOException {
 				// --- write fields and their serializers, in order
 
 				out.writeInt(fieldToSerializerConfigSnapshot.size());
-				for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+				for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> entry
 						: fieldToSerializerConfigSnapshot.entrySet()) {
 
 					outViewWrapper.writeUTF(entry.getKey());
@@ -838,13 +840,14 @@ public void write(DataOutputView out) throws IOException {
 					}
 
 					out.writeInt(outWithPos.getPosition());
-					TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, entry.getValue().f1);
+					TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+						outViewWrapper, (TypeSerializerSnapshot) entry.getValue().f1, entry.getValue().f0);
 				}
 
 				// --- write registered subclasses and their serializers, in registration order
 
 				out.writeInt(registeredSubclassesToSerializerConfigSnapshots.size());
-				for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+				for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> entry
 						: registeredSubclassesToSerializerConfigSnapshots.entrySet()) {
 
 					outViewWrapper.writeUTF(entry.getKey().getName());
@@ -855,13 +858,14 @@ public void write(DataOutputView out) throws IOException {
 					}
 
 					out.writeInt(outWithPos.getPosition());
-					TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, entry.getValue().f1);
+					TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+						outViewWrapper, (TypeSerializerSnapshot) entry.getValue().f1, entry.getValue().f0);
 				}
 
 				// --- write snapshot of non-registered subclass serializer cache
 
 				out.writeInt(nonRegisteredSubclassesToSerializerConfigSnapshots.size());
-				for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+				for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> entry
 						: nonRegisteredSubclassesToSerializerConfigSnapshots.entrySet()) {
 
 					outViewWrapper.writeUTF(entry.getKey().getName());
@@ -872,7 +876,8 @@ public void write(DataOutputView out) throws IOException {
 					}
 
 					out.writeInt(outWithPos.getPosition());
-					TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, entry.getValue().f1);
+					TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+						outViewWrapper, (TypeSerializerSnapshot) entry.getValue().f1, entry.getValue().f0);
 				}
 
 				out.writeInt(outWithPos.getPosition());
@@ -919,7 +924,7 @@ public void read(DataInputView in) throws IOException {
 				this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields);
 				String fieldName;
 				TypeSerializer<?> fieldSerializer;
-				TypeSerializerConfigSnapshot fieldSerializerConfigSnapshot;
+				TypeSerializerSnapshot fieldSerializerConfigSnapshot;
 				for (int i = 0; i < numFields; i++) {
 					fieldName = inViewWrapper.readUTF();
 
@@ -927,11 +932,12 @@ public void read(DataInputView in) throws IOException {
 					fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader(), true);
 
 					inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
-					fieldSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
+					fieldSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+						inViewWrapper, getUserCodeClassLoader(), fieldSerializer);
 
 					fieldToSerializerConfigSnapshot.put(
 						fieldName,
-						new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer, fieldSerializerConfigSnapshot));
+						new Tuple2<>(fieldSerializer, fieldSerializerConfigSnapshot));
 				}
 
 				// --- read registered subclasses and their serializers, in registration order
@@ -940,7 +946,7 @@ public void read(DataInputView in) throws IOException {
 				String registeredSubclassname;
 				Class<?> registeredSubclass;
 				TypeSerializer<?> registeredSubclassSerializer;
-				TypeSerializerConfigSnapshot registeredSubclassSerializerConfigSnapshot;
+				TypeSerializerSnapshot registeredSubclassSerializerConfigSnapshot;
 				for (int i = 0; i < numRegisteredSubclasses; i++) {
 					registeredSubclassname = inViewWrapper.readUTF();
 					try {
@@ -953,11 +959,12 @@ public void read(DataInputView in) throws IOException {
 					registeredSubclassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader(), true);
 
 					inWithPos.setPosition(registeredSerializerOffsets[i * 2 + 1]);
-					registeredSubclassSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
+					registeredSubclassSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+						inViewWrapper, getUserCodeClassLoader(), registeredSubclassSerializer);
 
 					this.registeredSubclassesToSerializerConfigSnapshots.put(
 						registeredSubclass,
-						new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(registeredSubclassSerializer, registeredSubclassSerializerConfigSnapshot));
+						new Tuple2<>(registeredSubclassSerializer, registeredSubclassSerializerConfigSnapshot));
 				}
 
 				// --- read snapshot of non-registered subclass serializer cache
@@ -966,7 +973,7 @@ public void read(DataInputView in) throws IOException {
 				String cachedSubclassname;
 				Class<?> cachedSubclass;
 				TypeSerializer<?> cachedSubclassSerializer;
-				TypeSerializerConfigSnapshot cachedSubclassSerializerConfigSnapshot;
+				TypeSerializerSnapshot cachedSubclassSerializerConfigSnapshot;
 				for (int i = 0; i < numCachedSubclassSerializers; i++) {
 					cachedSubclassname = inViewWrapper.readUTF();
 					try {
@@ -979,11 +986,12 @@ public void read(DataInputView in) throws IOException {
 					cachedSubclassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader(), true);
 
 					inWithPos.setPosition(cachedSerializerOffsets[i * 2 + 1]);
-					cachedSubclassSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
+					cachedSubclassSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+						inViewWrapper, getUserCodeClassLoader(), cachedSubclassSerializer);
 
 					this.nonRegisteredSubclassesToSerializerConfigSnapshots.put(
 						cachedSubclass,
-						new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(cachedSubclassSerializer, cachedSubclassSerializerConfigSnapshot));
+						new Tuple2<>(cachedSubclassSerializer, cachedSubclassSerializerConfigSnapshot));
 				}
 			}
 		}
@@ -993,15 +1001,15 @@ public int getVersion() {
 			return VERSION;
 		}
 
-		public LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() {
+		public LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> getFieldToSerializerConfigSnapshot() {
 			return fieldToSerializerConfigSnapshot;
 		}
 
-		public LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getRegisteredSubclassesToSerializerConfigSnapshots() {
+		public LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> getRegisteredSubclassesToSerializerConfigSnapshots() {
 			return registeredSubclassesToSerializerConfigSnapshots;
 		}
 
-		public HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getNonRegisteredSubclassesToSerializerConfigSnapshots() {
+		public HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> getNonRegisteredSubclassesToSerializerConfigSnapshots() {
 			return nonRegisteredSubclassesToSerializerConfigSnapshots;
 		}
 
@@ -1170,35 +1178,33 @@ private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) {
 			TypeSerializer<?>[] fieldSerializers,
 			HashMap<Class<?>, TypeSerializer<?>> nonRegisteredSubclassSerializerCache) {
 
-		final LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots =
+		final LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> fieldToSerializerConfigSnapshots =
 			new LinkedHashMap<>(fields.length);
 
 		for (int i = 0; i < fields.length; i++) {
 			fieldToSerializerConfigSnapshots.put(
 				fields[i].getName(),
-				new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializers[i], fieldSerializers[i].snapshotConfiguration()));
+				new Tuple2<>(fieldSerializers[i], fieldSerializers[i].snapshotConfiguration()));
 		}
 
-		final LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots =
+		final LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> registeredSubclassesToSerializerConfigSnapshots =
 				new LinkedHashMap<>(registeredSubclassesToTags.size());
 
 		for (Map.Entry<Class<?>, Integer> entry : registeredSubclassesToTags.entrySet()) {
 			registeredSubclassesToSerializerConfigSnapshots.put(
 					entry.getKey(),
-					new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+					new Tuple2<>(
 						registeredSubclassSerializers[entry.getValue()],
 						registeredSubclassSerializers[entry.getValue()].snapshotConfiguration()));
 		}
 
-		final HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots =
+		final HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nonRegisteredSubclassesToSerializerConfigSnapshots =
 				new LinkedHashMap<>(nonRegisteredSubclassSerializerCache.size());
 
 		for (Map.Entry<Class<?>, TypeSerializer<?>> entry : nonRegisteredSubclassSerializerCache.entrySet()) {
 			nonRegisteredSubclassesToSerializerConfigSnapshots.put(
 				entry.getKey(),
-				new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
-					entry.getValue(),
-					entry.getValue().snapshotConfiguration()));
+				new Tuple2<>(entry.getValue(), entry.getValue().snapshotConfiguration()));
 		}
 
 		return new PojoSerializerConfigSnapshot<>(
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index 7f9cc2145be..e29f6813254 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
@@ -268,9 +269,9 @@ public RowSerializerConfigSnapshot snapshotConfiguration() {
 	}
 
 	@Override
-	public CompatibilityResult<Row> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<Row> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof RowSerializerConfigSnapshot) {
-			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousFieldSerializersAndConfigs =
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousFieldSerializersAndConfigs =
 				((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
 
 			if (previousFieldSerializersAndConfigs.size() == fieldSerializers.length) {
@@ -279,7 +280,7 @@ public RowSerializerConfigSnapshot snapshotConfiguration() {
 
 				CompatibilityResult<?> compatResult;
 				int i = 0;
-				for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> f : previousFieldSerializersAndConfigs) {
+				for (Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> f : previousFieldSerializersAndConfigs) {
 					compatResult = CompatibilityUtil.resolveCompatibilityResult(
 							f.f0,
 							UnloadableDummyTypeSerializer.class,
@@ -312,7 +313,7 @@ public RowSerializerConfigSnapshot snapshotConfiguration() {
 		return CompatibilityResult.requiresMigration();
 	}
 
-	public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
+	public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot<Row> {
 
 		private static final int VERSION = 1;
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 3fb7defc2c9..34146ca8e02 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -135,7 +135,7 @@ public boolean canEqual(Object obj) {
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof TupleSerializerConfigSnapshot) {
 			final TupleSerializerConfigSnapshot<T> config = (TupleSerializerConfigSnapshot<T>) configSnapshot;
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
index eac5200da9c..a1180aeba53 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -32,7 +32,7 @@
  * Snapshot of a tuple serializer's configuration.
  */
 @Internal
-public final class TupleSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+public final class TupleSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<T> {
 
 	private static final int VERSION = 1;
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 0a028ebd79d..9e02c359014 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -182,7 +182,7 @@ public boolean canEqual(Object obj) {
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof ValueSerializerConfigSnapshot) {
 			final ValueSerializerConfigSnapshot<T> config = (ValueSerializerConfigSnapshot<T>) configSnapshot;
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index d8158aabb92..5ac914e03ef 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -463,7 +463,7 @@ private void checkKryoInitialized() {
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof KryoSerializerConfigSnapshot) {
 			final KryoSerializerConfigSnapshot<T> config = (KryoSerializerConfigSnapshot<T>) configSnapshot;
 
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 57015c78be0..30dd71ac2ce 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -98,33 +98,33 @@ public void testInstantiate() {
 
 	@Test
 	public void testConfigSnapshotInstantiation() {
-		TypeSerializerConfigSnapshot configSnapshot = getSerializer().snapshotConfiguration();
+		TypeSerializerSnapshot<T> configSnapshot = getSerializer().snapshotConfiguration();
 
 		InstantiationUtil.instantiate(configSnapshot.getClass());
 	}
 
 	@Test
 	public void testSnapshotConfigurationAndReconfigure() throws Exception {
-		final TypeSerializerConfigSnapshot configSnapshot = getSerializer().snapshotConfiguration();
+		final TypeSerializerSnapshot<T> configSnapshot = getSerializer().snapshotConfiguration();
 
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
-				new DataOutputViewStreamWrapper(out), configSnapshot);
+			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+				new DataOutputViewStreamWrapper(out), configSnapshot, getSerializer());
 			serializedConfig = out.toByteArray();
 		}
 
-		TypeSerializerConfigSnapshot restoredConfig;
+		TypeSerializerSnapshot<T> restoredConfig;
 		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			restoredConfig = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
-				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+			restoredConfig = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), getSerializer());
 		}
 
 		CompatibilityResult strategy = getSerializer().ensureCompatibility(restoredConfig);
 		assertFalse(strategy.isRequiresMigration());
 
 		// also verify that the serializer's reconfigure implementation detects incompatibility
-		strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot());
+		strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot<>());
 		assertTrue(strategy.isRequiresMigration());
 	}
 	
@@ -526,7 +526,7 @@ public void skipBytesToRead(int numBytes) throws IOException {
 		}
 	}
 
-	public static final class TestIncompatibleSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
+	public static final class TestIncompatibleSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot<T> {
 		@Override
 		public int getVersion() {
 			return 0;
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
index 04739314dcc..abc0c25b45e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
@@ -148,53 +149,49 @@ public void testSerializerSerializationWithInvalidClass() throws Exception {
 	 */
 	@Test
 	public void testSerializeConfigurationSnapshots() throws Exception {
-		TypeSerializerSerializationUtilTest.TestConfigSnapshot configSnapshot1 =
-			new TypeSerializerSerializationUtilTest.TestConfigSnapshot(1, "foo");
-
-		TypeSerializerSerializationUtilTest.TestConfigSnapshot configSnapshot2 =
-			new TypeSerializerSerializationUtilTest.TestConfigSnapshot(2, "bar");
+		TypeSerializerSerializationUtilTest.TestConfigSnapshot<String> configSnapshot1 =
+			new TypeSerializerSerializationUtilTest.TestConfigSnapshot<>(1, "foo");
 
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerSerializationUtil.writeSerializerConfigSnapshots(
+			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
 				new DataOutputViewStreamWrapper(out),
 				configSnapshot1,
-				configSnapshot2);
+				StringSerializer.INSTANCE);
 
 			serializedConfig = out.toByteArray();
 		}
 
-		TypeSerializerConfigSnapshot[] restoredConfigs;
+		TypeSerializerSnapshot<?> restoredConfigs;
 		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			restoredConfigs = TypeSerializerSerializationUtil.readSerializerConfigSnapshots(
-				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+			restoredConfigs = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), null);
 		}
 
-		assertEquals(2, restoredConfigs.length);
-		assertEquals(configSnapshot1, restoredConfigs[0]);
-		assertEquals(configSnapshot2, restoredConfigs[1]);
+		assertEquals(configSnapshot1, restoredConfigs);
 	}
 
 	/**
 	 * Verifies that deserializing config snapshots fail if the config class could not be found.
 	 */
-	@Test
+	@Test(expected = IOException.class)
 	public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception {
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
-				new DataOutputViewStreamWrapper(out), new TypeSerializerSerializationUtilTest.TestConfigSnapshot(123, "foobar"));
+			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+				new DataOutputViewStreamWrapper(out),
+				new TypeSerializerSerializationUtilTest.TestConfigSnapshot<>(123, "foobar"),
+				StringSerializer.INSTANCE);
 			serializedConfig = out.toByteArray();
 		}
 
 		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
 			// read using a dummy classloader
-			TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
-				new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null));
-			fail("Expected a ClassNotFoundException wrapped in IOException");
-		} catch (IOException expected) {
-			// test passes
+			TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null), null);
 		}
+
+		fail("Expected a ClassNotFoundException wrapped in IOException");
 	}
 
 	/**
@@ -203,10 +200,10 @@ public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception {
 	 */
 	@Test
 	public void testSerializerAndConfigPairsSerializationWithSerializerDeserializationFailures() throws Exception {
-		List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs = Arrays.asList(
-			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+		List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigs = Arrays.asList(
+			new Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>(
 				IntSerializer.INSTANCE, IntSerializer.INSTANCE.snapshotConfiguration()),
-			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+			new Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>(
 				DoubleSerializer.INSTANCE, DoubleSerializer.INSTANCE.snapshotConfiguration()));
 
 		byte[] serializedSerializersAndConfigs;
@@ -220,7 +217,7 @@ public void testSerializerAndConfigPairsSerializationWithSerializerDeserializati
 		cnfThrowingClassnames.add(IntSerializer.class.getName());
 		cnfThrowingClassnames.add(DoubleSerializer.class.getName());
 
-		List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> restored;
+		List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> restored;
 		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedSerializersAndConfigs)) {
 			restored = TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
 				new DataInputViewStreamWrapper(in),
@@ -231,9 +228,13 @@ public void testSerializerAndConfigPairsSerializationWithSerializerDeserializati
 
 		Assert.assertEquals(2, restored.size());
 		Assert.assertTrue(restored.get(0).f0 instanceof UnloadableDummyTypeSerializer);
-		Assert.assertEquals(IntSerializer.INSTANCE.snapshotConfiguration(), restored.get(0).f1);
+		Assert.assertEquals(
+			IntSerializer.INSTANCE.snapshotConfiguration(),
+			restored.get(0).f1);
 		Assert.assertTrue(restored.get(1).f0 instanceof UnloadableDummyTypeSerializer);
-		Assert.assertEquals(DoubleSerializer.INSTANCE.snapshotConfiguration(), restored.get(1).f1);
+		Assert.assertEquals(
+			DoubleSerializer.INSTANCE.snapshotConfiguration(),
+			restored.get(1).f1);
 	}
 
 	/**
@@ -266,7 +267,7 @@ public void testAnonymousSerializerClassWithChangedSerialVersionUID() throws Exc
 		Assert.assertTrue(anonymousClassSerializer.getClass().isAnonymousClass());
 	}
 
-	public static class TestConfigSnapshot extends TypeSerializerConfigSnapshot {
+	public static class TestConfigSnapshot<T> extends TypeSerializerConfigSnapshot<T> {
 
 		static final int VERSION = 1;
 
@@ -411,12 +412,12 @@ public void serialize(Integer record, DataOutputView target) throws IOException
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		public TypeSerializerConfigSnapshot<Integer> snapshotConfiguration() {
 			return IntSerializer.INSTANCE.snapshotConfiguration();
 		}
 
 		@Override
-		public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 			return IntSerializer.INSTANCE.ensureCompatibility(configSnapshot);
 		}
 
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
index e3ce3ee6fca..8f82ea88ed9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
@@ -20,8 +20,8 @@
 
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
@@ -95,15 +95,15 @@ public void testConfigurationSnapshotSerialization() throws Exception {
 
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
-				new DataOutputViewStreamWrapper(out), serializer.snapshotConfiguration());
+			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+				new DataOutputViewStreamWrapper(out), serializer.snapshotConfiguration(), serializer);
 			serializedConfig = out.toByteArray();
 		}
 
-		TypeSerializerConfigSnapshot restoredConfig;
+		TypeSerializerSnapshot<PublicEnum> restoredConfig;
 		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			restoredConfig = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
-				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+			restoredConfig = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), serializer);
 		}
 
 		CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(restoredConfig);
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
index 1f67acbab7d..b4f983382f5 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.TestLogger;
@@ -92,25 +92,27 @@ private static CompatibilityResult checkCompatibility(String enumSourceA, String
 
 		EnumSerializer enumSerializer = new EnumSerializer(classLoader.loadClass(ENUM_NAME));
 
-		TypeSerializerConfigSnapshot snapshot = enumSerializer.snapshotConfiguration();
+		TypeSerializerSnapshot snapshot = enumSerializer.snapshotConfiguration();
 		byte[] snapshotBytes;
 		try (
 			ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
 			DataOutputViewStreamWrapper outputViewStreamWrapper = new DataOutputViewStreamWrapper(outBuffer)) {
 
-			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outputViewStreamWrapper, snapshot);
+			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+				outputViewStreamWrapper, snapshot, enumSerializer);
 			snapshotBytes = outBuffer.toByteArray();
 		}
 
 		ClassLoader classLoader2 = compileAndLoadEnum(
 			temporaryFolder.newFolder(), ENUM_NAME + ".java", enumSourceB);
 
-		TypeSerializerConfigSnapshot restoredSnapshot;
+		TypeSerializerSnapshot restoredSnapshot;
 		try (
 			ByteArrayInputStream inBuffer = new ByteArrayInputStream(snapshotBytes);
 			DataInputViewStreamWrapper inputViewStreamWrapper = new DataInputViewStreamWrapper(inBuffer)) {
 
-			restoredSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inputViewStreamWrapper, classLoader2);
+			restoredSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				inputViewStreamWrapper, classLoader2, enumSerializer);
 		}
 
 		EnumSerializer enumSerializer2 = new EnumSerializer(classLoader2.loadClass(ENUM_NAME));
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index bdb3f8fb3e4..9620846a4a6 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -41,8 +41,8 @@
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.DateSerializer;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
@@ -294,10 +294,11 @@ public void testReconfigureWithDifferentPojoType() throws Exception {
 			TypeExtractor.getForClass(SubTestUserClassB.class).createSerializer(new ExecutionConfig());
 
 		// snapshot configuration and serialize to bytes
-		TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer1.snapshotConfiguration();
+		TypeSerializerSnapshot pojoSerializerConfigSnapshot = pojoSerializer1.snapshotConfiguration();
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+				new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot, pojoSerializer1);
 			serializedConfig = out.toByteArray();
 		}
 
@@ -306,8 +307,8 @@ public void testReconfigureWithDifferentPojoType() throws Exception {
 
 		// read configuration again from bytes
 		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
-				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+			pojoSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), pojoSerializer2);
 		}
 
 		CompatibilityResult<SubTestUserClassA> compatResult = pojoSerializer2.ensureCompatibility(pojoSerializerConfigSnapshot);
@@ -330,10 +331,11 @@ public void testReconfigureDifferentSubclassRegistrationOrder() throws Exception
 		int subClassBTag = pojoSerializer.getRegisteredClasses().get(SubTestUserClassB.class);
 
 		// snapshot configuration and serialize to bytes
-		TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
+		TypeSerializerSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+				new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot, pojoSerializer);
 			serializedConfig = out.toByteArray();
 		}
 
@@ -346,8 +348,8 @@ public void testReconfigureDifferentSubclassRegistrationOrder() throws Exception
 
 		// read configuration from bytes
 		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
-				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+			pojoSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), pojoSerializer);
 		}
 
 		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
@@ -376,10 +378,11 @@ public void testReconfigureRepopulateNonregisteredSubclassSerializerCache() thro
 		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
 
 		// snapshot configuration and serialize to bytes
-		TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
+		TypeSerializerSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+				new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot, pojoSerializer);
 			serializedConfig = out.toByteArray();
 		}
 
@@ -389,8 +392,8 @@ public void testReconfigureRepopulateNonregisteredSubclassSerializerCache() thro
 
 		// read configuration from bytes
 		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
-				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+			pojoSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), pojoSerializer);
 		}
 
 		// reconfigure - check reconfiguration result and that subclass serializer cache is repopulated
@@ -434,10 +437,11 @@ public void testReconfigureWithPreviouslyNonregisteredSubclasses() throws Except
 		assertEquals(0, pojoSerializer.getRegisteredSerializers().length);
 
 		// snapshot configuration and serialize to bytes
-		TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
+		TypeSerializerSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+				new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot, pojoSerializer);
 			serializedConfig = out.toByteArray();
 		}
 
@@ -449,8 +453,8 @@ public void testReconfigureWithPreviouslyNonregisteredSubclasses() throws Except
 
 		// read configuration from bytes
 		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
-				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+			pojoSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), pojoSerializer);
 		}
 
 		// reconfigure - check reconfiguration result and that
@@ -483,36 +487,36 @@ public void testReconfigureWithDifferentFieldOrder() throws Exception {
 		// creating this serializer just for generating config snapshots of the field serializers
 		PojoSerializer<TestUserClass> ser = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
 
-		LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot =
+		LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> mockOriginalFieldToSerializerConfigSnapshot =
 			new LinkedHashMap<>(mockOriginalFieldOrder.length);
 		mockOriginalFieldToSerializerConfigSnapshot.put(
 			mockOriginalFieldOrder[0].getName(),
-			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+			new Tuple2<>(
 				ser.getFieldSerializers()[3],
 				ser.getFieldSerializers()[3].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
 			mockOriginalFieldOrder[1].getName(),
-			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+			new Tuple2<>(
 				ser.getFieldSerializers()[2],
 				ser.getFieldSerializers()[2].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
 			mockOriginalFieldOrder[2].getName(),
-			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+			new Tuple2<>(
 				ser.getFieldSerializers()[5],
 				ser.getFieldSerializers()[5].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
 			mockOriginalFieldOrder[3].getName(),
-			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+			new Tuple2<>(
 				ser.getFieldSerializers()[0],
 				ser.getFieldSerializers()[0].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
 			mockOriginalFieldOrder[4].getName(),
-			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+			new Tuple2<>(
 				ser.getFieldSerializers()[1],
 				ser.getFieldSerializers()[1].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
 			mockOriginalFieldOrder[5].getName(),
-			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+			new Tuple2<>(
 				ser.getFieldSerializers()[4],
 				ser.getFieldSerializers()[4].snapshotConfiguration()));
 
@@ -529,8 +533,8 @@ public void testReconfigureWithDifferentFieldOrder() throws Exception {
 			new PojoSerializer.PojoSerializerConfigSnapshot<>(
 				TestUserClass.class,
 				mockOriginalFieldToSerializerConfigSnapshot, // this mocks the previous field order
-				new LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>(), // empty; irrelevant for this test
-				new HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>()); // empty; irrelevant for this test
+				new LinkedHashMap<>(), // empty; irrelevant for this test
+				new HashMap<>()); // empty; irrelevant for this test
 
 		// reconfigure - check reconfiguration result and that fields are reordered to the previous order
 		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(mockPreviousConfigSnapshot);
@@ -552,7 +556,8 @@ public void testSerializerSerializationFailureResilience() throws Exception{
 		byte[] serializedConfig;
 		try (
 			ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), config);
+			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+				new DataOutputViewStreamWrapper(out), config, pojoSerializer);
 			serializedConfig = out.toByteArray();
 		}
 
@@ -567,11 +572,12 @@ public void testSerializerSerializationFailureResilience() throws Exception{
 		PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> deserializedConfig;
 		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
 			deserializedConfig = (PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass>)
-				TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
+				TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
 					new DataInputViewStreamWrapper(in),
 					new ArtificialCNFExceptionThrowingClassLoader(
 						Thread.currentThread().getContextClassLoader(),
-						cnfThrowingClassnames));
+						cnfThrowingClassnames),
+					pojoSerializer);
 		}
 
 		Assert.assertFalse(pojoSerializer.ensureCompatibility(deserializedConfig).isRequiresMigration());
@@ -582,9 +588,9 @@ private static void verifyPojoSerializerConfigSnapshotWithSerializerSerializatio
 			PojoSerializer.PojoSerializerConfigSnapshot<?> original,
 			PojoSerializer.PojoSerializerConfigSnapshot<?> deserializedConfig) {
 
-		LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs =
+		LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> originalFieldSerializersAndConfs =
 				original.getFieldToSerializerConfigSnapshot();
-		for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+		for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> entry
 				: deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) {
 
 			Assert.assertTrue(entry.getValue().f0 instanceof UnloadableDummyTypeSerializer);
@@ -598,10 +604,10 @@ private static void verifyPojoSerializerConfigSnapshotWithSerializerSerializatio
 			}
 		}
 
-		LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalRegistrations =
+		LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> originalRegistrations =
 				original.getRegisteredSubclassesToSerializerConfigSnapshots();
 
-		for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+		for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> entry
 				: deserializedConfig.getRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) {
 
 			Assert.assertTrue(entry.getValue().f0 instanceof UnloadableDummyTypeSerializer);
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 89e9ec3f8c9..3c9d990fb31 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -21,8 +21,9 @@
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
@@ -61,10 +62,10 @@ public void testMigrationStrategyForRemovedAvroDependency() throws Exception {
 		KryoSerializer<TestClass> kryoSerializerForA = new KryoSerializer<>(TestClass.class, new ExecutionConfig());
 
 		// read configuration again from bytes
-		TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot;
+		TypeSerializerSnapshot kryoSerializerConfigSnapshot;
 		try (InputStream in = getClass().getResourceAsStream("/kryo-serializer-flink1.3-snapshot")) {
-			kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
-				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+			kryoSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), kryoSerializerForA);
 		}
 		CompatibilityResult<TestClass> compatResult = kryoSerializerForA.ensureCompatibility(kryoSerializerConfigSnapshot);
 		assertFalse(compatResult.isRequiresMigration());
@@ -94,10 +95,11 @@ public void testMigrationStrategyWithDifferentKryoType() throws Exception {
 		KryoSerializer<TestClassA> kryoSerializerForA = new KryoSerializer<>(TestClassA.class, new ExecutionConfig());
 
 		// snapshot configuration and serialize to bytes
-		TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializerForA.snapshotConfiguration();
+		TypeSerializerSnapshot kryoSerializerConfigSnapshot = kryoSerializerForA.snapshotConfiguration();
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+				new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot, kryoSerializerForA);
 			serializedConfig = out.toByteArray();
 		}
 
@@ -105,8 +107,8 @@ public void testMigrationStrategyWithDifferentKryoType() throws Exception {
 
 		// read configuration again from bytes
 		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
-				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+			kryoSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), kryoSerializerForB);
 		}
 
 		CompatibilityResult<TestClassB> compatResult = kryoSerializerForB.ensureCompatibility(kryoSerializerConfigSnapshot);
@@ -248,10 +250,11 @@ public void testMigrationStrategyForDifferentRegistrationOrder() throws Exceptio
 		int testClassBId = kryoSerializer.getKryo().getRegistration(TestClassB.class).getId();
 
 		// snapshot configuration and serialize to bytes
-		TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializer.snapshotConfiguration();
+		TypeSerializerSnapshot kryoSerializerConfigSnapshot = kryoSerializer.snapshotConfiguration();
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+				new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot, kryoSerializer);
 			serializedConfig = out.toByteArray();
 		}
 
@@ -264,8 +267,8 @@ public void testMigrationStrategyForDifferentRegistrationOrder() throws Exceptio
 
 		// read configuration from bytes
 		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
-				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+			kryoSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), kryoSerializer);
 		}
 
 		// reconfigure - check reconfiguration result and that registration id remains the same
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index f3801991be6..5f76b094361 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -225,7 +225,7 @@ public boolean canEqual(Object obj) {
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
 			final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot;
 
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index b313625bfe2..dad1d6df16a 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -100,7 +100,7 @@
 	private transient Schema schema;
 
 	/** The serializer configuration snapshot, cached for efficiency. */
-	private transient AvroSchemaSerializerConfigSnapshot configSnapshot;
+	private transient AvroSchemaSerializerConfigSnapshot<T> configSnapshot;
 
 	/** The currently accessing thread, set and checked on debug level only. */
 	private transient volatile Thread currentThread;
@@ -264,20 +264,20 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
 	// ------------------------------------------------------------------------
 
 	@Override
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+	public TypeSerializerConfigSnapshot<T> snapshotConfiguration() {
 		if (configSnapshot == null) {
 			checkAvroInitialized();
-			configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false));
+			configSnapshot = new AvroSchemaSerializerConfigSnapshot<>(schema.toString(false));
 		}
 		return configSnapshot;
 	}
 
 	@Override
-	@SuppressWarnings("deprecation")
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	@SuppressWarnings({"deprecation", "unchecked"})
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) {
 			// proper schema snapshot, can do the sophisticated schema-based compatibility check
-			final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
+			final String schemaString = ((AvroSchemaSerializerConfigSnapshot<?>) configSnapshot).getSchemaString();
 			final Schema lastSchema = new Schema.Parser().parse(schemaString);
 
 			checkAvroInitialized();
@@ -291,7 +291,7 @@ else if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
 			// old snapshot case, just compare the type
 			// we don't need to restore any Kryo stuff, since Kryo was never used for persistence,
 			// only for object-to-object copies.
-			final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot;
+			final AvroSerializerConfigSnapshot<T> old = (AvroSerializerConfigSnapshot<T>) configSnapshot;
 			return type.equals(old.getTypeClass()) ?
 					CompatibilityResult.compatible() : CompatibilityResult.requiresMigration();
 		}
@@ -419,7 +419,7 @@ private void exitExclusiveThread() {
 	/**
 	 * A config snapshot for the Avro Serializer that stores the Avro Schema to check compatibility.
 	 */
-	public static final class AvroSchemaSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
+	public static final class AvroSchemaSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot<T> {
 
 		private String schemaString;
 
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
index e5eb5d89a1f..63b79b2d064 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
@@ -172,13 +173,13 @@ public String toString() {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+	public TypeSerializerSnapshot<T> snapshotConfiguration() {
 		// we return the configuration of the actually used serializer here
 		return serializer.snapshotConfiguration();
 	}
 
 	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot ||
 				configSnapshot instanceof AvroSerializerConfigSnapshot) {
 
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
index cfd1506da9e..7b8763bfa2f 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
@@ -20,11 +20,10 @@
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.formats.avro.generated.SimpleUser;
 import org.apache.flink.formats.avro.utils.TestDataGenerator;
@@ -78,12 +77,12 @@ public void testCompatibilityWithPojoSerializer() throws Exception {
 		// retrieve the old config snapshot
 
 		final TypeSerializer<SimpleUser> serializer;
-		final TypeSerializerConfigSnapshot configSnapshot;
+		final TypeSerializerSnapshot<SimpleUser> configSnapshot;
 
 		try (InputStream in = getClass().getClassLoader().getResourceAsStream(SNAPSHOT_RESOURCE)) {
 			DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in);
 
-			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> deserialized =
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> deserialized =
 					TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
 							inView, getClass().getClassLoader());
 
@@ -93,14 +92,14 @@ public void testCompatibilityWithPojoSerializer() throws Exception {
 			final TypeSerializer<SimpleUser> typedSerializer = (TypeSerializer<SimpleUser>) deserialized.get(0).f0;
 
 			serializer = typedSerializer;
-			configSnapshot = deserialized.get(0).f1;
+			configSnapshot = (TypeSerializerSnapshot<SimpleUser>) deserialized.get(0).f1;
 		}
 
 		assertNotNull(serializer);
 		assertNotNull(configSnapshot);
 
 		assertTrue(serializer instanceof PojoSerializer);
-		assertTrue(configSnapshot instanceof PojoSerializerConfigSnapshot);
+		assertTrue(configSnapshot instanceof PojoSerializer.PojoSerializerConfigSnapshot);
 
 		// sanity check for the test: check that the test data works with the original serializer
 		validateDeserialization(serializer);
@@ -114,7 +113,7 @@ public void testCompatibilityWithPojoSerializer() throws Exception {
 		// deserialize the data and make sure this still works
 		validateDeserialization(newSerializer);
 
-		TypeSerializerConfigSnapshot nextSnapshot = newSerializer.snapshotConfiguration();
+		TypeSerializerSnapshot<SimpleUser> nextSnapshot = newSerializer.snapshotConfiguration();
 		final TypeSerializer<SimpleUser> nextSerializer = new AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
 
 		assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration());
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 579e761c014..7b1441729b1 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -395,12 +395,12 @@ public int hashCode() {
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		public TypeSerializerConfigSnapshot<ElementType> snapshotConfiguration() {
 			throw new UnsupportedOperationException();
 		}
 
 		@Override
-		public CompatibilityResult<ElementType> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		public CompatibilityResult<ElementType> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 			throw new UnsupportedOperationException();
 		}
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index b355c38e858..f5e89d705b8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -25,6 +25,7 @@
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -852,7 +853,7 @@ private boolean checkFilterCondition(
 	 * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state.
 	 */
 	@Deprecated
-	public static final class NFASerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+	public static final class NFASerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<MigratedNFA<T>> {
 
 		private static final int VERSION = 1;
 
@@ -976,15 +977,15 @@ public int hashCode() {
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		public TypeSerializerConfigSnapshot<MigratedNFA<T>> snapshotConfiguration() {
 			return new NFASerializerConfigSnapshot<>(eventSerializer, sharedBufferSerializer);
 		}
 
 		@Override
 		public CompatibilityResult<MigratedNFA<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
 			if (configSnapshot instanceof NFASerializerConfigSnapshot) {
-				List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs =
-					((NFASerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+				List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigs =
+					((NFASerializerConfigSnapshot<?>) configSnapshot).getNestedSerializersAndConfigs();
 
 				CompatibilityResult<T> eventCompatResult = CompatibilityUtil.resolveCompatibilityResult(
 					serializersAndConfigs.get(0).f0,
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 6e5f9db2a67..0617310b375 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
@@ -157,7 +158,8 @@ public int hashCode() {
 	/**
 	 * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state.
 	 */
-	public static final class SharedBufferSerializerConfigSnapshot<K, V> extends CompositeTypeSerializerConfigSnapshot {
+	public static final class SharedBufferSerializerConfigSnapshot<K, V>
+			extends CompositeTypeSerializerConfigSnapshot<SharedBuffer<V>> {
 
 		private static final int VERSION = 1;
 
@@ -354,7 +356,7 @@ public int hashCode() {
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		public TypeSerializerConfigSnapshot<SharedBuffer<V>> snapshotConfiguration() {
 			return new SharedBufferSerializerConfigSnapshot<>(
 				keySerializer,
 				valueSerializer,
@@ -364,8 +366,8 @@ public TypeSerializerConfigSnapshot snapshotConfiguration() {
 		@Override
 		public CompatibilityResult<SharedBuffer<V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
 			if (configSnapshot instanceof SharedBufferSerializerConfigSnapshot) {
-				List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializerConfigSnapshots =
-					((SharedBufferSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+				List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializerConfigSnapshots =
+					((SharedBufferSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs();
 
 				CompatibilityResult<K> keyCompatResult = CompatibilityUtil.resolveCompatibilityResult(
 					serializerConfigSnapshots.get(0).f0,
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
index b782d8a07c6..683379927e5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
@@ -19,10 +19,13 @@
 package org.apache.flink.cep.nfa.sharedbuffer;
 
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -181,19 +184,39 @@ public boolean canEqual(Object obj) {
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
-			return elementSerializer.snapshotConfiguration();
+		public TypeSerializerConfigSnapshot<Lockable<E>> snapshotConfiguration() {
+			return new LockableSerializerConfigSnapshot<>(elementSerializer);
 		}
 
 		@Override
-		public CompatibilityResult<Lockable<E>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-			CompatibilityResult<E> inputComaptibilityResult = elementSerializer.ensureCompatibility(configSnapshot);
-			if (inputComaptibilityResult.isRequiresMigration()) {
-				return CompatibilityResult.requiresMigration(new LockableTypeSerializer<>(
-					new TypeDeserializerAdapter<>(inputComaptibilityResult.getConvertDeserializer()))
-				);
+		public CompatibilityResult<Lockable<E>> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
+			if (configSnapshot instanceof LockableSerializerConfigSnapshot) {
+				@SuppressWarnings("unchecked")
+				LockableSerializerConfigSnapshot<E> snapshot = (LockableSerializerConfigSnapshot<E>) configSnapshot;
+
+				Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> nestedSerializerAndConfig =
+					snapshot.getSingleNestedSerializerAndConfig();
+
+				CompatibilityResult<E> inputCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
+					nestedSerializerAndConfig.f1.restoreSerializer(),
+					UnloadableDummyTypeSerializer.class,
+					nestedSerializerAndConfig.f1,
+					elementSerializer);
+
+				return (inputCompatibilityResult.isRequiresMigration())
+					? CompatibilityResult.requiresMigration()
+					: CompatibilityResult.compatible();
 			} else {
-				return CompatibilityResult.compatible();
+				// backwards compatibility path
+				CompatibilityResult<E> inputCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
+					configSnapshot.restoreSerializer(),
+					UnloadableDummyTypeSerializer.class,
+					configSnapshot,
+					elementSerializer);
+
+				return (inputCompatibilityResult.isRequiresMigration())
+					? CompatibilityResult.requiresMigration()
+					: CompatibilityResult.compatible();
 			}
 		}
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableSerializerConfigSnapshot.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableSerializerConfigSnapshot.java
new file mode 100644
index 00000000000..9e78bc013ef
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableSerializerConfigSnapshot.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for the {@link Lockable.LockableTypeSerializer}.
+ */
+@Internal
+public class LockableSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot<Lockable<E>> {
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public LockableSerializerConfigSnapshot() {}
+
+	public LockableSerializerConfigSnapshot(TypeSerializer<E> elementSerializer) {
+		super(elementSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+}
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerConfigSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerConfigSnapshot.java
new file mode 100644
index 00000000000..b2126028de2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerConfigSnapshot.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.table.api.dataview.ListView;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for the {@link ListViewSerializer}.
+ *
+ * @param <T> the type of the list elements.
+ */
+public final class ListViewSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<ListView<T>> {
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public ListViewSerializerConfigSnapshot() {}
+
+	public ListViewSerializerConfigSnapshot(ListSerializer<T> listSerializer) {
+		super(listSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+}
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerConfigSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerConfigSnapshot.java
new file mode 100644
index 00000000000..10b4419a8c7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerConfigSnapshot.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.table.api.dataview.MapView;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for the {@link MapViewSerializer}.
+ *
+ * @param <K> the key type of the map entries.
+ * @param <V> the value type of the map entries.
+ */
+public class MapViewSerializerConfigSnapshot<K, V> extends CompositeTypeSerializerConfigSnapshot<MapView<K, V>> {
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public MapViewSerializerConfigSnapshot() {}
+
+	public MapViewSerializerConfigSnapshot(MapSerializer<K, V> mapSerializer) {
+		super(mapSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
index a450c2ce1e5..9a10c118c0b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
@@ -75,16 +75,35 @@ class ListViewSerializer[T](val listSerializer: ListSerializer[T])
   override def equals(obj: Any): Boolean = canEqual(this) &&
     listSerializer.equals(obj.asInstanceOf[ListViewSerializer[_]].listSerializer)
 
-  override def snapshotConfiguration(): TypeSerializerConfigSnapshot =
-    listSerializer.snapshotConfiguration()
+  override def snapshotConfiguration(): ListViewSerializerConfigSnapshot[T] =
+    new ListViewSerializerConfigSnapshot[T](listSerializer)
 
-  // copy and modified from ListSerializer.ensureCompatibility
   override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[ListView[T]] = {
+      configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[ListView[T]] = {
 
     configSnapshot match {
-      case snapshot: CollectionSerializerConfigSnapshot[_] =>
-        val previousListSerializerAndConfig = snapshot.getSingleNestedSerializerAndConfig
+      case snapshot: ListViewSerializerConfigSnapshot[T] =>
+        val previousListSerializerAndConfig =
+          snapshot.getSingleNestedSerializerAndConfig
+
+        val compatResult = CompatibilityUtil.resolveCompatibilityResult(
+          previousListSerializerAndConfig.f0,
+          classOf[UnloadableDummyTypeSerializer[_]],
+          previousListSerializerAndConfig.f1,
+          listSerializer)
+
+        if (!compatResult.isRequiresMigration) {
+          CompatibilityResult.compatible[ListView[T]]
+        } else {
+          CompatibilityResult.requiresMigration[ListView[T]]
+        }
+
+      // backwards compatibility path;
+      // Flink versions older or equal to 1.5.x returns a
+      // CollectionSerializerConfigSnapshot as the snapshot
+      case legacySnapshot: CollectionSerializerConfigSnapshot[java.util.List[T], T] =>
+        val previousListSerializerAndConfig =
+          legacySnapshot.getSingleNestedSerializerAndConfig
 
         val compatResult = CompatibilityUtil.resolveCompatibilityResult(
           previousListSerializerAndConfig.f0,
@@ -94,13 +113,6 @@ class ListViewSerializer[T](val listSerializer: ListSerializer[T])
 
         if (!compatResult.isRequiresMigration) {
           CompatibilityResult.compatible[ListView[T]]
-        } else if (compatResult.getConvertDeserializer != null) {
-          CompatibilityResult.requiresMigration(
-            new ListViewSerializer[T](
-              new ListSerializer[T](
-                new TypeDeserializerAdapter[T](compatResult.getConvertDeserializer))
-            )
-          )
         } else {
           CompatibilityResult.requiresMigration[ListView[T]]
         }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
index c53f10c37e5..d6419e81ab3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
@@ -77,22 +77,42 @@ class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
   override def equals(obj: Any): Boolean = canEqual(this) &&
     mapSerializer.equals(obj.asInstanceOf[MapViewSerializer[_, _]].mapSerializer)
 
-  override def snapshotConfiguration(): TypeSerializerConfigSnapshot =
-    mapSerializer.snapshotConfiguration()
+  override def snapshotConfiguration(): MapViewSerializerConfigSnapshot[K, V] =
+    new MapViewSerializerConfigSnapshot[K, V](mapSerializer)
 
   // copy and modified from MapSerializer.ensureCompatibility
-  override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot)
+  override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot[_])
   : CompatibilityResult[MapView[K, V]] = {
 
     configSnapshot match {
-      case snapshot: MapSerializerConfigSnapshot[_, _] =>
-        val previousKvSerializersAndConfigs = snapshot.getNestedSerializersAndConfigs
+      case snapshot: MapViewSerializerConfigSnapshot[K, V] =>
+        val previousKvSerializersAndConfigs =
+          snapshot.getNestedSerializersAndConfigs
+
+        val mapSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+          previousKvSerializersAndConfigs.get(0).f0,
+          classOf[UnloadableDummyTypeSerializer[_]],
+          previousKvSerializersAndConfigs.get(0).f1,
+          mapSerializer)
+
+        if (!mapSerializerCompatResult.isRequiresMigration) {
+          CompatibilityResult.compatible[MapView[K, V]]
+        } else {
+          CompatibilityResult.requiresMigration[MapView[K, V]]
+        }
+
+      // backwards compatibility path;
+      // Flink versions older or equal to 1.5.x returns a
+      // MapSerializerConfigSnapshot as the snapshot
+      case legacySnapshot: MapSerializerConfigSnapshot[K, V] =>
+        val previousKvSerializersAndConfigs =
+          legacySnapshot.getNestedSerializersAndConfigs
 
         val keyCompatResult = CompatibilityUtil.resolveCompatibilityResult(
           previousKvSerializersAndConfigs.get(0).f0,
           classOf[UnloadableDummyTypeSerializer[_]],
           previousKvSerializersAndConfigs.get(0).f1,
-          mapSerializer.getKeySerializer)
+          mapSerializer)
 
         val valueCompatResult = CompatibilityUtil.resolveCompatibilityResult(
           previousKvSerializersAndConfigs.get(1).f0,
@@ -102,15 +122,6 @@ class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
 
         if (!keyCompatResult.isRequiresMigration && !valueCompatResult.isRequiresMigration) {
           CompatibilityResult.compatible[MapView[K, V]]
-        } else if (keyCompatResult.getConvertDeserializer != null
-          && valueCompatResult.getConvertDeserializer != null) {
-          CompatibilityResult.requiresMigration(
-            new MapViewSerializer[K, V](
-              new MapSerializer[K, V](
-                new TypeDeserializerAdapter[K](keyCompatResult.getConvertDeserializer),
-                new TypeDeserializerAdapter[V](valueCompatResult.getConvertDeserializer))
-            )
-          )
         } else {
           CompatibilityResult.requiresMigration[MapView[K, V]]
         }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
index 9418095d5f7..0ce3aee3739 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -80,12 +80,12 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali
   // Serializer configuration snapshotting & compatibility
   // --------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot[CRow] = {
     new CRowSerializer.CRowSerializerConfigSnapshot(rowSerializer)
   }
 
   override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[CRow] = {
+      configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[CRow] = {
 
     configSnapshot match {
       case crowSerializerConfigSnapshot: CRowSerializer.CRowSerializerConfigSnapshot =>
@@ -116,7 +116,7 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali
 object CRowSerializer {
 
   class CRowSerializerConfigSnapshot(rowSerializers: TypeSerializer[Row]*)
-    extends CompositeTypeSerializerConfigSnapshot(rowSerializers: _*) {
+    extends CompositeTypeSerializerConfigSnapshot[CRow](rowSerializers: _*) {
 
     override def getVersion: Int = CRowSerializerConfigSnapshot.VERSION
   }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 3c4f4b0a269..36144502e17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -145,15 +146,15 @@ public int hashCode() {
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+	public TypeSerializerConfigSnapshot<ArrayList<T>> snapshotConfiguration() {
 		return new CollectionSerializerConfigSnapshot<>(elementSerializer);
 	}
 
 	@Override
-	public CompatibilityResult<ArrayList<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<ArrayList<T>> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof CollectionSerializerConfigSnapshot) {
-			Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousElemSerializerAndConfig =
-				((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
+			Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> previousElemSerializerAndConfig =
+				((CollectionSerializerConfigSnapshot<?, ?>) configSnapshot).getSingleNestedSerializerAndConfig();
 
 			CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
 					previousElemSerializerAndConfig.f0,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index e42f223051e..ab6e8b1a046 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.typeutils.BackwardsCompatibleSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
@@ -33,8 +35,9 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.CURRENT_STATE_META_INFO_SNAPSHOT_VERSION;
 
@@ -44,7 +47,17 @@
  */
 public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritable {
 
-	public static final int VERSION = 5;
+	public static final int VERSION = 6;
+
+	private static final Map<Integer, Integer> META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER = new HashMap<>();
+	static {
+		META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.put(1, 1);
+		META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.put(2, 2);
+		META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.put(3, 3);
+		META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.put(4, 4);
+		META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.put(5, 5);
+		META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.put(6, CURRENT_STATE_META_INFO_SNAPSHOT_VERSION);
+	}
 
 	//TODO allow for more (user defined) compression formats + backwards compatibility story.
 	/** This specifies if we use a compressed format write the key-groups */
@@ -53,8 +66,9 @@
 	/** This specifies whether or not to use dummy {@link UnloadableDummyTypeSerializer} when serializers cannot be read. */
 	private boolean isSerializerPresenceRequired;
 
+	// TODO the keySerializer field should be removed, once all serializers have the restoreSerializer() method implemented
 	private TypeSerializer<K> keySerializer;
-	private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
+	private TypeSerializerSnapshot<K> keySerializerConfigSnapshot;
 
 	private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
 
@@ -84,11 +98,11 @@ public KeyedBackendSerializationProxy(
 		return stateMetaInfoSnapshots;
 	}
 
-	public TypeSerializer<K> getKeySerializer() {
-		return keySerializer;
+	public TypeSerializer<K> restoreKeySerializer() {
+		return keySerializerConfigSnapshot.restoreSerializer();
 	}
 
-	public TypeSerializerConfigSnapshot getKeySerializerConfigSnapshot() {
+	public TypeSerializerSnapshot<K> getKeySerializerConfigSnapshot() {
 		return keySerializerConfigSnapshot;
 	}
 
@@ -103,7 +117,7 @@ public int getVersion() {
 
 	@Override
 	public int[] getCompatibleVersions() {
-		return new int[]{VERSION, 4, 3, 2, 1};
+		return new int[]{VERSION, 5, 4, 3, 2, 1};
 	}
 
 	@Override
@@ -113,10 +127,7 @@ public void write(DataOutputView out) throws IOException {
 		// write the compression format used to write each key-group
 		out.writeBoolean(usingKeyGroupCompression);
 
-		// write in a way to be fault tolerant of read failures when deserializing the key serializer
-		TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
-				out,
-				Collections.singletonList(new Tuple2<>(keySerializer, keySerializerConfigSnapshot)));
+		TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, keySerializerConfigSnapshot, keySerializer);
 
 		// write individual registered keyed state metainfos
 		out.writeShort(stateMetaInfoSnapshots.size());
@@ -139,23 +150,30 @@ public void read(DataInputView in) throws IOException {
 		}
 
 		// only starting from version 3, we have the key serializer and its config snapshot written
-		if (readVersion >= 3) {
-			Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> keySerializerAndConfig =
+		if (readVersion >= 6) {
+			this.keySerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				in, userCodeClassLoader, null);
+		} else if (readVersion >= 3) {
+			Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> keySerializerAndConfig =
 					TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader).get(0);
-			this.keySerializer = (TypeSerializer<K>) keySerializerAndConfig.f0;
-			this.keySerializerConfigSnapshot = keySerializerAndConfig.f1;
+			this.keySerializerConfigSnapshot = (TypeSerializerSnapshot<K>) keySerializerAndConfig.f1;
 		} else {
-			this.keySerializer = TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true);
-			this.keySerializerConfigSnapshot = null;
+			this.keySerializerConfigSnapshot = new BackwardsCompatibleSerializerSnapshot<>(
+				TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true));
 		}
+		this.keySerializer = null;
 
 		if (isSerializerPresenceRequired) {
-			checkSerializerPresence(keySerializer);
+			checkSerializerPresence(this.keySerializerConfigSnapshot.restoreSerializer());
 		}
 
-		int metaInfoVersion = readVersion > 4 ? CURRENT_STATE_META_INFO_SNAPSHOT_VERSION : readVersion;
+		Integer metaInfoSnapshotVersion = META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.get(readVersion);
+		if (metaInfoSnapshotVersion == null) {
+			// this should not happen; guard for the future
+			throw new IOException("Cannot determine corresponding meta info snapshot version for keyed backend serialization readVersion=" + readVersion);
+		}
 		final StateMetaInfoReader stateMetaInfoReader = StateMetaInfoSnapshotReadersWriters.getReader(
-			metaInfoVersion,
+			metaInfoSnapshotVersion,
 			StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE);
 
 		int numKvStates = in.readShort();
@@ -165,9 +183,9 @@ public void read(DataInputView in) throws IOException {
 
 			if (isSerializerPresenceRequired) {
 				checkSerializerPresence(
-					snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
+					snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
 				checkSerializerPresence(
-					snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+					snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
 			}
 			stateMetaInfoSnapshots.add(snapshot);
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
index 6c01501fc03..f828d602c3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
@@ -28,7 +28,9 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.CURRENT_STATE_META_INFO_SNAPSHOT_VERSION;
 
@@ -38,7 +40,16 @@
  */
 public class OperatorBackendSerializationProxy extends VersionedIOReadableWritable {
 
-	public static final int VERSION = 4;
+	public static final int VERSION = 5;
+
+	private static final Map<Integer, Integer> META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER = new HashMap<>();
+	static {
+		META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.put(1, 1);
+		META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.put(2, 2);
+		META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.put(3, 3);
+		META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.put(4, 5);
+		META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.put(5, CURRENT_STATE_META_INFO_SNAPSHOT_VERSION);
+	}
 
 	private List<StateMetaInfoSnapshot> operatorStateMetaInfoSnapshots;
 	private List<StateMetaInfoSnapshot> broadcastStateMetaInfoSnapshots;
@@ -67,7 +78,7 @@ public int getVersion() {
 
 	@Override
 	public int[] getCompatibleVersions() {
-		return new int[] {VERSION, 3, 2, 1};
+		return new int[] {VERSION, 4, 3, 2, 1};
 	}
 
 	@Override
@@ -91,11 +102,14 @@ public void read(DataInputView in) throws IOException {
 		super.read(in);
 
 		final int proxyReadVersion = getReadVersion();
-		final int metaInfoReadVersion = proxyReadVersion > 3 ?
-			CURRENT_STATE_META_INFO_SNAPSHOT_VERSION : proxyReadVersion;
+		final Integer metaInfoSnapshotVersion = META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.get(proxyReadVersion);
+		if (metaInfoSnapshotVersion == null) {
+			// this should not happen; guard for the future
+			throw new IOException("Cannot determine corresponding meta info snapshot version for operator backend serialization readVersion=" + proxyReadVersion);
+		}
 
 		final StateMetaInfoReader stateMetaInfoReader = StateMetaInfoSnapshotReadersWriters.getReader(
-			metaInfoReadVersion,
+			metaInfoSnapshotVersion,
 			StateMetaInfoSnapshotReadersWriters.StateTypeHint.OPERATOR_STATE);
 
 		int numOperatorStates = in.readShort();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
index 02ab8eff02f..70a14142474 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
@@ -72,9 +72,9 @@ public RegisteredBroadcastStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot sn
 			OperatorStateHandle.Mode.valueOf(
 				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
 			(TypeSerializer<K>) Preconditions.checkNotNull(
-				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)),
+				snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)),
 			(TypeSerializer<V>) Preconditions.checkNotNull(
-				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+				snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
 		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType());
 	}
 
@@ -151,7 +151,7 @@ private StateMetaInfoSnapshot computeSnapshot() {
 			StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
 			assignmentMode.toString());
 		Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
-		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
+		Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap = new HashMap<>(2);
 		String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
 		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
 		serializerMap.put(keySerializerKey, keySerializer.duplicate());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
index 789551da0e2..7f95ed70326 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -22,7 +22,7 @@
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
@@ -82,9 +82,9 @@ public RegisteredKeyValueStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot sna
 			StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
 			snapshot.getName(),
 			(TypeSerializer<N>) Preconditions.checkNotNull(
-				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER)),
+				snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER)),
 			(TypeSerializer<S>) Preconditions.checkNotNull(
-				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)), null);
+				snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)), null);
 		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType());
 	}
 
@@ -192,7 +192,7 @@ public int hashCode() {
 
 		// check compatibility results to determine if state migration is required
 		CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-			restoredStateMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
+			restoredStateMetaInfoSnapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
 			null,
 			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
 				StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
@@ -200,7 +200,7 @@ public int hashCode() {
 
 		TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
 		CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-			restoredStateMetaInfoSnapshot.getTypeSerializer(
+			restoredStateMetaInfoSnapshot.restoreTypeSerializer(
 				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
 			UnloadableDummyTypeSerializer.class,
 			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
@@ -232,7 +232,7 @@ private StateMetaInfoSnapshot computeSnapshot() {
 			StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
 			stateType.toString());
 		Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
-		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
+		Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap = new HashMap<>(2);
 		String namespaceSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString();
 		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
 		serializerMap.put(namespaceSerializerKey, namespaceSerializer.duplicate());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
index b65671e4483..10ba0296057 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
@@ -69,7 +69,7 @@ public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot sna
 		this(
 			snapshot.getName(),
 			(TypeSerializer<S>) Preconditions.checkNotNull(
-				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
+				snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
 			OperatorStateHandle.Mode.valueOf(
 				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
 		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
@@ -140,7 +140,7 @@ private StateMetaInfoSnapshot computeSnapshot() {
 		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
 		Map<String, TypeSerializer<?>> serializerMap =
 			Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
-		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap =
+		Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap =
 			Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());
 
 		return new StateMetaInfoSnapshot(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
index 9ef23ed0cfe..0304b929c6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
@@ -48,7 +48,7 @@ public RegisteredPriorityQueueStateBackendMetaInfo(
 	public RegisteredPriorityQueueStateBackendMetaInfo(StateMetaInfoSnapshot snapshot) {
 		this(snapshot.getName(),
 			(TypeSerializer<T>) Preconditions.checkNotNull(
-				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+				snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
 		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE == snapshot.getBackendStateType());
 	}
 
@@ -68,7 +68,7 @@ private StateMetaInfoSnapshot computeSnapshot() {
 			Collections.singletonMap(
 				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
 				elementSerializer.duplicate());
-		Map<String, TypeSerializerConfigSnapshot> serializerSnapshotMap =
+		Map<String, TypeSerializerSnapshot<?>> serializerSnapshotMap =
 			Collections.singletonMap(
 				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
 				elementSerializer.snapshotConfiguration());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 05070f92367..837e51fafc0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -207,7 +207,7 @@ public HeapKeyedStateBackend(
 				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
 
 			CompatibilityResult<T> compatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
-				restoredMetaInfoSnapshot.getTypeSerializer(serializerKey),
+				restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey),
 				null,
 				restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey),
 				byteOrderedElementSerializer);
@@ -416,7 +416,7 @@ private void restorePartitionedState(Collection<KeyedStateHandle> state) throws
 					// check for key serializer compatibility; this also reconfigures the
 					// key serializer to be compatible, if it is required and is possible
 					if (CompatibilityUtil.resolveCompatibilityResult(
-							serializationProxy.getKeySerializer(),
+							serializationProxy.restoreKeySerializer(),
 							UnloadableDummyTypeSerializer.class,
 							serializationProxy.getKeySerializerConfigSnapshot(),
 							keySerializer)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
index 2f83857d67d..0e2d0056581 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
@@ -56,6 +56,7 @@
 			case 3:
 			case 4:
 			case 5:
+			case 6:
 				return createV2PlusReader(stateTable);
 			default:
 				throw new IllegalArgumentException("Unknown version: " + version);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
index a3b110939b3..77c267adff1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
@@ -19,9 +19,10 @@
 package org.apache.flink.runtime.state.metainfo;
 
 import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.BackwardsCompatibleSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.core.memory.DataInputView;
@@ -60,21 +61,14 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
 
 			final StateDescriptor.Type stateDescType = StateDescriptor.Type.values()[in.readInt()];
 			final String stateName = in.readUTF();
-			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs =
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigs =
 				TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader);
 
 			Map<String, String> optionsMap = Collections.singletonMap(
 				StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
 				stateDescType.toString());
 
-
-			Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
-			serializerMap.put(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString(),
-				serializersAndConfigs.get(0).f0);
-			serializerMap.put(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
-				serializersAndConfigs.get(1).f0);
-
-			Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotMap = new HashMap<>(2);
+			Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotMap = new HashMap<>(2);
 			serializerConfigSnapshotMap.put(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString(),
 				serializersAndConfigs.get(0).f1);
 			serializerConfigSnapshotMap.put(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
@@ -84,8 +78,7 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
 				stateName,
 				StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
 				optionsMap,
-				serializerConfigSnapshotMap,
-				serializerMap);
+				serializerConfigSnapshotMap);
 		}
 	}
 
@@ -111,20 +104,21 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
 				StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
 				stateDescType.toString());
 
-
-			Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
-			serializerMap.put(
+			Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotMap = new HashMap<>(2);
+			serializerConfigSnapshotMap.put(
 				StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString(),
-				TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true));
-			serializerMap.put(
+				new BackwardsCompatibleSerializerSnapshot<>(
+					TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true)));
+			serializerConfigSnapshotMap.put(
 				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
-				TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true));
+				new BackwardsCompatibleSerializerSnapshot<>(
+					TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true)));
+
 			return new StateMetaInfoSnapshot(
 				stateName,
 				StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
 				optionsMap,
-				Collections.emptyMap(),
-				serializerMap);
+				serializerConfigSnapshotMap);
 		}
 	}
 
@@ -156,24 +150,20 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
 				StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
 				mode.toString());
 
-			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> stateSerializerAndConfigList =
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> stateSerializerAndConfigList =
 				TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader);
 
 			final int listSize = stateSerializerAndConfigList.size();
 			StateMetaInfoSnapshot.BackendStateType stateType = listSize == 1 ?
 				StateMetaInfoSnapshot.BackendStateType.OPERATOR : StateMetaInfoSnapshot.BackendStateType.BROADCAST;
-			Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(listSize);
-			Map<String, TypeSerializerConfigSnapshot> serializerConfigsMap = new HashMap<>(listSize);
+			Map<String, TypeSerializerSnapshot<?>> serializerConfigsMap = new HashMap<>(listSize);
 			for (int i = 0; i < listSize; ++i) {
-				Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> serializerAndConf =
+				Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> serializerAndConf =
 					stateSerializerAndConfigList.get(i);
 
 				// this particular mapping happens to support both, V2 and V3
 				String serializerKey = ORDERED_KEY_STRINGS[ORDERED_KEY_STRINGS.length - 1 - i];
 
-				serializerMap.put(
-					serializerKey,
-					serializerAndConf.f0);
 				serializerConfigsMap.put(
 					serializerKey,
 					serializerAndConf.f1);
@@ -183,8 +173,7 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
 				name,
 				stateType,
 				optionsMap,
-				serializerConfigsMap,
-				serializerMap);
+				serializerConfigsMap);
 		}
 	}
 
@@ -220,10 +209,9 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
 					name,
 					StateMetaInfoSnapshot.BackendStateType.OPERATOR,
 					optionsMap,
-					Collections.emptyMap(),
 					Collections.singletonMap(
 						StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
-						stateSerializer));
+						new BackwardsCompatibleSerializerSnapshot<>(stateSerializer)));
 			} catch (ClassNotFoundException exception) {
 				throw new IOException(exception);
 			} finally {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
index 5a3190c7545..1e9d9191079 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
@@ -20,13 +20,14 @@
 
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -80,9 +81,9 @@
 
 	/** The configurations of all the type serializers used with the state. */
 	@Nonnull
-	private final Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshots;
+	private final Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshots;
 
-	// TODO this will go awy again after FLINK-9377 is merged, that is why it is currently duplicated here.
+	// TODO this will go away once all serializers have the restoreSerializer() factory method properly implemented.
 	/** The serializers used by the state. */
 	@Nonnull
 	private final Map<String, TypeSerializer<?>> serializers;
@@ -91,7 +92,21 @@ public StateMetaInfoSnapshot(
 		@Nonnull String name,
 		@Nonnull BackendStateType backendStateType,
 		@Nonnull Map<String, String> options,
-		@Nonnull Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshots,
+		@Nonnull Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshots) {
+		this(name, backendStateType, options, serializerConfigSnapshots, new HashMap<>());
+	}
+
+	/**
+	 * TODO this variant, which requires providing the serializers,
+	 * TODO should actually be removed, leaving only {@link #StateMetaInfoSnapshot(String, BackendStateType, Map, Map)}.
+	 * TODO This is still used by snapshot extracting methods (i.e. computeSnapshot() method of specific state meta
+	 * TODO info subclasses), and will be removed once all serializers have the restoreSerializer() factory method implemented.
+	 */
+	public StateMetaInfoSnapshot(
+		@Nonnull String name,
+		@Nonnull BackendStateType backendStateType,
+		@Nonnull Map<String, String> options,
+		@Nonnull Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshots,
 		@Nonnull Map<String, TypeSerializer<?>> serializers) {
 		this.name = name;
 		this.backendStateType = backendStateType;
@@ -106,12 +121,12 @@ public BackendStateType getBackendStateType() {
 	}
 
 	@Nullable
-	public TypeSerializerConfigSnapshot getTypeSerializerConfigSnapshot(@Nonnull String key) {
+	public TypeSerializerSnapshot<?> getTypeSerializerConfigSnapshot(@Nonnull String key) {
 		return serializerConfigSnapshots.get(key);
 	}
 
 	@Nullable
-	public TypeSerializerConfigSnapshot getTypeSerializerConfigSnapshot(@Nonnull CommonSerializerKeys key) {
+	public TypeSerializerSnapshot<?> getTypeSerializerConfigSnapshot(@Nonnull CommonSerializerKeys key) {
 		return getTypeSerializerConfigSnapshot(key.toString());
 	}
 
@@ -136,22 +151,26 @@ public String getName() {
 	}
 
 	@Nullable
-	public TypeSerializer<?> getTypeSerializer(@Nonnull String key) {
-		return serializers.get(key);
+	public TypeSerializer<?> restoreTypeSerializer(@Nonnull String key) {
+		TypeSerializerSnapshot<?> configSnapshot = getTypeSerializerConfigSnapshot(key);
+		return (configSnapshot != null) ? configSnapshot.restoreSerializer() : null;
 	}
 
 	@Nullable
-	public TypeSerializer<?> getTypeSerializer(@Nonnull CommonSerializerKeys key) {
-		return getTypeSerializer(key.toString());
+	public TypeSerializer<?> restoreTypeSerializer(@Nonnull CommonSerializerKeys key) {
+		return restoreTypeSerializer(key.toString());
 	}
 
 	@Nonnull
-	public Map<String, TypeSerializerConfigSnapshot> getSerializerConfigSnapshotsImmutable() {
+	public Map<String, TypeSerializerSnapshot<?>> getSerializerConfigSnapshotsImmutable() {
 		return Collections.unmodifiableMap(serializerConfigSnapshots);
 	}
 
-	@Nonnull
-	public Map<String, TypeSerializer<?>> getSerializersImmutable() {
-		return Collections.unmodifiableMap(serializers);
+	/**
+	 * TODO this method should be removed once the serializer map is removed.
+	 */
+	@Nullable
+	public TypeSerializer<?> getTypeSerializer(@Nonnull String key) {
+		return serializers.get(key);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
index 926e75f7574..4408dfcacef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.state.metainfo;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 
@@ -43,12 +43,12 @@ private StateMetaInfoSnapshotReadersWriters() {}
 
 	/**
 	 * Current version for the serialization format of {@link StateMetaInfoSnapshotReadersWriters}.
-	 * - v5: Flink 1.6.x
+	 * - v6: since Flink 1.7.x
 	 */
-	public static final int CURRENT_STATE_META_INFO_SNAPSHOT_VERSION = 5;
+	public static final int CURRENT_STATE_META_INFO_SNAPSHOT_VERSION = 6;
 
 	/**
-	 * Enum for backeards compatibility. This gives a hint about the expected state type for which a
+	 * Enum for backwards compatibility. This gives a hint about the expected state type for which a
 	 * {@link StateMetaInfoSnapshot} should be deserialized.
 	 *
 	 * TODO this can go away after we eventually drop backwards compatibility with all versions < 5.
@@ -76,7 +76,8 @@ public static StateMetaInfoWriter getWriter() {
 	@Nonnull
 	public static StateMetaInfoReader getReader(int readVersion, @Nonnull StateTypeHint stateTypeHint) {
 
-		if (readVersion < CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+		if (readVersion < 5) {
+			// versions before 5 still had different state meta info formats between keyed / operator state
 			switch (stateTypeHint) {
 				case KEYED_STATE:
 					return getLegacyKeyedStateMetaInfoReader(readVersion);
@@ -98,12 +99,14 @@ public static StateMetaInfoReader getReader(int readVersion, @Nonnull StateTypeH
 	 * @return the requested reader.
 	 */
 	@Nonnull
-	public static StateMetaInfoReader getReader(int readVersion) {
-		if (readVersion == CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
-			// latest version shortcut
-			return CurrentReaderImpl.INSTANCE;
-		} else {
-			throw new IllegalArgumentException("Unsupported read version for state meta info: " + readVersion);
+	static StateMetaInfoReader getReader(int readVersion) {
+		switch (readVersion) {
+			case CURRENT_STATE_META_INFO_SNAPSHOT_VERSION:
+				return CurrentReaderImpl.INSTANCE;
+			case 5:
+				return V5ReaderImpl.INSTANCE;
+			default:
+					throw new IllegalArgumentException("Unsupported read version for state meta info: " + readVersion);
 		}
 	}
 
@@ -138,10 +141,19 @@ private static StateMetaInfoReader getLegacyOperatorStateMetaInfoReader(int read
 		}
 	}
 
-	//----------------------------------------------------------
+	// ---------------------------------------------------------------------------------
+	//  Current version reader / writer implementation
+	// ---------------------------------------------------------------------------------
 
 	/**
-	 * Implementation of {@link StateMetaInfoWriter}.
+	 * Implementation of {@link StateMetaInfoWriter} for current implementation. The serialization format is as follows:
+	 *
+	 * <ul>
+	 *     <li>1. State name (UDF)</li>
+	 *     <li>2. State backend type enum ordinal (int)</li>
+	 *     <li>3. Meta info options map, consisting of the map size (int) followed by the key value pairs (String, String)</li>
+	 *     <li>4. Serializer configuration map, consisting of the map size (int) followed by the key value pairs (String, TypeSerializerConfigSnapshot)</li>
+	 * </ul>
 	 */
 	static class CurrentWriterImpl implements StateMetaInfoWriter {
 
@@ -152,10 +164,8 @@ public void writeStateMetaInfoSnapshot(
 			@Nonnull StateMetaInfoSnapshot snapshot,
 			@Nonnull DataOutputView outputView) throws IOException {
 			final Map<String, String> optionsMap = snapshot.getOptionsImmutable();
-			final Map<String, TypeSerializer<?>> serializerMap = snapshot.getSerializersImmutable();
-			final Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap =
+			final Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap =
 				snapshot.getSerializerConfigSnapshotsImmutable();
-			Preconditions.checkState(serializerMap.size() == serializerConfigSnapshotsMap.size());
 
 			outputView.writeUTF(snapshot.getName());
 			outputView.writeInt(snapshot.getBackendStateType().ordinal());
@@ -165,21 +175,14 @@ public void writeStateMetaInfoSnapshot(
 				outputView.writeUTF(entry.getValue());
 			}
 
-			outputView.writeInt(serializerMap.size());
-			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersWithConfig =
-				new ArrayList<>(serializerMap.size());
-
-			for (Map.Entry<String, TypeSerializer<?>> entry : serializerMap.entrySet()) {
+			outputView.writeInt(serializerConfigSnapshotsMap.size());
+			for (Map.Entry<String, TypeSerializerSnapshot<?>> entry : serializerConfigSnapshotsMap.entrySet()) {
 				final String key = entry.getKey();
-				outputView.writeUTF(key);
-
-				TypeSerializerConfigSnapshot configForSerializer =
-					Preconditions.checkNotNull(serializerConfigSnapshotsMap.get(key));
+				outputView.writeUTF(entry.getKey());
 
-				serializersWithConfig.add(new Tuple2<>(entry.getValue(), configForSerializer));
+				TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+					outputView, (TypeSerializerSnapshot) entry.getValue(), snapshot.getTypeSerializer(key));
 			}
-
-			TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(outputView, serializersWithConfig);
 		}
 	}
 
@@ -190,6 +193,48 @@ public void writeStateMetaInfoSnapshot(
 
 		private static final CurrentReaderImpl INSTANCE = new CurrentReaderImpl();
 
+		@Nonnull
+		@Override
+		public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
+			@Nonnull DataInputView inputView,
+			@Nonnull ClassLoader userCodeClassLoader) throws IOException {
+
+			final String stateName = inputView.readUTF();
+			final StateMetaInfoSnapshot.BackendStateType stateType =
+				StateMetaInfoSnapshot.BackendStateType.values()[inputView.readInt()];
+			final int numOptions = inputView.readInt();
+			HashMap<String, String> optionsMap = new HashMap<>(numOptions);
+			for (int i = 0; i < numOptions; ++i) {
+				String key = inputView.readUTF();
+				String value = inputView.readUTF();
+				optionsMap.put(key, value);
+			}
+
+			final int numSerializerConfigSnapshots = inputView.readInt();
+			final HashMap<String, TypeSerializerSnapshot<?>> serializerConfigsMap = new HashMap<>(numSerializerConfigSnapshots);
+
+			for (int i = 0; i < numSerializerConfigSnapshots; ++i) {
+				serializerConfigsMap.put(
+					inputView.readUTF(),
+					TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+						inputView, userCodeClassLoader, null));
+			}
+
+			return new StateMetaInfoSnapshot(stateName, stateType, optionsMap, serializerConfigsMap);
+		}
+	}
+
+	// ---------------------------------------------------------------------------------
+	//  Legacy reader implementations
+	// ---------------------------------------------------------------------------------
+
+	/**
+	 * Implementation of {@link StateMetaInfoReader} for version 5 (Flink 1.6.x) and generic for all state types.
+	 */
+	static class V5ReaderImpl implements StateMetaInfoReader {
+
+		private static final V5ReaderImpl INSTANCE = new V5ReaderImpl();
+
 		@Nonnull
 		@Override
 		public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
@@ -208,24 +253,22 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
 			}
 			final int numSerializer = inputView.readInt();
 			final ArrayList<String> serializerKeys = new ArrayList<>(numSerializer);
-			final HashMap<String, TypeSerializer<?>> serializerMap = new HashMap<>(numSerializer);
-			final HashMap<String, TypeSerializerConfigSnapshot> serializerConfigsMap = new HashMap<>(numSerializer);
+			final HashMap<String, TypeSerializerSnapshot<?>> serializerConfigsMap = new HashMap<>(numSerializer);
 
 			for (int i = 0; i < numSerializer; ++i) {
 				serializerKeys.add(inputView.readUTF());
 			}
-			final List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersWithConfig =
+			final List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersWithConfig =
 				TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(inputView, userCodeClassLoader);
 
 			for (int i = 0; i < numSerializer; ++i) {
 				String key = serializerKeys.get(i);
-				final Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> serializerConfigTuple =
+				final Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> serializerConfigTuple =
 					serializersWithConfig.get(i);
-				serializerMap.put(key, serializerConfigTuple.f0);
 				serializerConfigsMap.put(key, serializerConfigTuple.f1);
 			}
 
-			return new StateMetaInfoSnapshot(stateName, stateType, optionsMap, serializerConfigsMap, serializerMap);
+			return new StateMetaInfoSnapshot(stateName, stateType, optionsMap, serializerConfigsMap);
 		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
index 4bdc5e85cdd..67d8a26c813 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
@@ -131,12 +131,12 @@ public int hashCode() {
 	}
 
 	@Override
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+	public TypeSerializerConfigSnapshot<IntList> snapshotConfiguration() {
 		throw new UnsupportedOperationException();
 	}
 
 	@Override
-	public CompatibilityResult<IntList> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<IntList> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		throw new UnsupportedOperationException();
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
index 0ae5e71d428..fe456c8204c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
@@ -140,12 +140,12 @@ public boolean equals(Object obj) {
 	}
 
 	@Override
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+	public TypeSerializerConfigSnapshot<IntPair> snapshotConfiguration() {
 		throw new UnsupportedOperationException();
 	}
 
 	@Override
-	public CompatibilityResult<IntPair> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<IntPair> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		throw new UnsupportedOperationException();
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
index 17ee5f1530b..cad78e7c018 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
@@ -108,12 +108,12 @@ public int hashCode() {
 	}
 
 	@Override
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+	public TypeSerializerConfigSnapshot<StringPair> snapshotConfiguration() {
 		throw new UnsupportedOperationException();
 	}
 
 	@Override
-	public CompatibilityResult<StringPair> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<StringPair> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		throw new UnsupportedOperationException();
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
index c1c56bf250b..82d89e05a4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
@@ -401,12 +401,12 @@ public int hashCode() {
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		public TypeSerializerConfigSnapshot<String> snapshotConfiguration() {
 			return null;
 		}
 
 		@Override
-		public CompatibilityResult<String> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		public CompatibilityResult<String> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 			return null;
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index b5988f30f84..ab09557691a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -350,12 +350,12 @@ public int hashCode() {
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		public TypeSerializerConfigSnapshot<Integer> snapshotConfiguration() {
 			return IntSerializer.INSTANCE.snapshotConfiguration();
 		}
 
 		@Override
-		public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 			return IntSerializer.INSTANCE.ensureCompatibility(configSnapshot);
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 9c487a4dc46..97665518d38 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -79,7 +79,7 @@ public void testKeyedBackendSerializationProxyRoundtrip() throws Exception {
 		}
 
 		Assert.assertTrue(serializationProxy.isUsingKeyGroupCompression());
-		Assert.assertEquals(keySerializer, serializationProxy.getKeySerializer());
+		Assert.assertEquals(keySerializer, serializationProxy.restoreKeySerializer());
 		Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
 		assertEqualStateMetaInfoSnapshotsLists(stateMetaInfoList, serializationProxy.getStateMetaInfoSnapshots());
 	}
@@ -128,7 +128,7 @@ public void testKeyedBackendSerializationProxyRoundtripWithSerializerSerializati
 		}
 
 		Assert.assertEquals(true, serializationProxy.isUsingKeyGroupCompression());
-		Assert.assertTrue(serializationProxy.getKeySerializer() instanceof UnloadableDummyTypeSerializer);
+		Assert.assertTrue(serializationProxy.restoreKeySerializer() instanceof UnloadableDummyTypeSerializer);
 		Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
 
 		for (StateMetaInfoSnapshot snapshot : serializationProxy.getStateMetaInfoSnapshots()) {
@@ -435,7 +435,6 @@ private void assertEqualStateMetaInfoSnapshots(StateMetaInfoSnapshot expected, S
 		Assert.assertEquals(expected.getName(), actual.getName());
 		Assert.assertEquals(expected.getBackendStateType(), actual.getBackendStateType());
 		Assert.assertEquals(expected.getOptionsImmutable(), actual.getOptionsImmutable());
-		Assert.assertEquals(expected.getSerializersImmutable(), actual.getSerializersImmutable());
 		Assert.assertEquals(
 			expected.getSerializerConfigSnapshotsImmutable(),
 			actual.getSerializerConfigSnapshotsImmutable());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 2634268c947..0712b641678 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -4550,14 +4550,14 @@ private TestReconfigurableCustomTypeSerializer(boolean reconfigured) {
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
-			return new ParameterlessTypeSerializerConfig(getClass().getName());
+		public TypeSerializerConfigSnapshot<TestCustomStateClass> snapshotConfiguration() {
+			return new ParameterlessTypeSerializerConfig<>(getClass().getName());
 		}
 
 		@Override
-		public CompatibilityResult<TestCustomStateClass> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		public CompatibilityResult<TestCustomStateClass> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 			if (configSnapshot instanceof ParameterlessTypeSerializerConfig &&
-					((ParameterlessTypeSerializerConfig) configSnapshot).getSerializationFormatIdentifier().equals(getClass().getName())) {
+					((ParameterlessTypeSerializerConfig<?>) configSnapshot).getSerializationFormatIdentifier().equals(getClass().getName())) {
 
 				this.reconfigured = true;
 				return CompatibilityResult.compatible();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
index 2c48e4b132a..54cd9c9c837 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -654,12 +654,12 @@ public void disable() {
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		public TypeSerializerConfigSnapshot<Integer> snapshotConfiguration() {
 			throw new UnsupportedOperationException();
 		}
 
 		@Override
-		public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 			throw new UnsupportedOperationException();
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
index ce23f30006b..87d91e2ea0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
@@ -145,12 +145,12 @@ public int hashCode() {
 	}
 
 	@Override
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+	public TypeSerializerConfigSnapshot<Record> snapshotConfiguration() {
 		throw new UnsupportedOperationException();
 	}
 
 	@Override
-	public CompatibilityResult<Record> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+	public CompatibilityResult<Record> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		throw new UnsupportedOperationException();
 	}
 }
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java
new file mode 100644
index 00000000000..9566a663169
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import scala.util.Either;
+
+/**
+ * Configuration snapshot for serializers of Scala's {@link Either} type,
+ * containing configuration snapshots of the Left and Right serializers.
+ */
+public class ScalaEitherSerializerConfigSnapshot<E extends Either<L, R>, L, R>
+		extends CompositeTypeSerializerConfigSnapshot<E> {
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public ScalaEitherSerializerConfigSnapshot() {}
+
+	public ScalaEitherSerializerConfigSnapshot(TypeSerializer<L> leftSerializer, TypeSerializer<R> rightSerializer) {
+		super(leftSerializer, rightSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+}
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
index 03eef12e9ca..215bd447c05 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
@@ -22,6 +22,8 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 
+import scala.Option;
+
 /**
  * A {@link TypeSerializerConfigSnapshot} for the Scala {@link OptionSerializer}.
  *
@@ -29,7 +31,7 @@
  * allow calling different base class constructors from subclasses, while we need that
  * for the default empty constructor.
  */
-public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot {
+public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot<Option<E>> {
 
 	private static final int VERSION = 1;
 
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java
index 6abb3ea06d7..72baca44193 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java
@@ -22,6 +22,8 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 
+import scala.util.Try;
+
 /**
  * A {@link TypeSerializerConfigSnapshot} for the Scala {@link TrySerializer}.
  *
@@ -29,7 +31,7 @@
  * allow calling different base class constructors from subclasses, while we need that
  * for the default empty constructor.
  */
-public class ScalaTrySerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot {
+public class ScalaTrySerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot<Try<E>> {
 
 	private static final int VERSION = 1;
 
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java
index 9a39421dad5..b7f9ca6ec1a 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java
@@ -22,6 +22,8 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 
+import scala.collection.TraversableOnce;
+
 /**
  * A {@link TypeSerializerConfigSnapshot} for the Scala {@link TraversableSerializer}.
  *
@@ -29,7 +31,8 @@
  * allow calling different base class constructors from subclasses, while we need that
  * for the default empty constructor.
  */
-public class TraversableSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot {
+public class TraversableSerializerConfigSnapshot<T extends TraversableOnce<E>, E>
+		extends CompositeTypeSerializerConfigSnapshot<T> {
 
 	private static final int VERSION = 1;
 
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 439e0c2865d..82637befdcd 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -111,51 +111,51 @@ class EitherSerializer[A, B, T <: Either[A, B]](
   // Serializer configuration snapshotting & compatibility
   // --------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): EitherSerializerConfigSnapshot[A, B] = {
-    new EitherSerializerConfigSnapshot[A, B](leftSerializer, rightSerializer)
+  override def snapshotConfiguration(): ScalaEitherSerializerConfigSnapshot[T, A, B] = {
+    new ScalaEitherSerializerConfigSnapshot[T, A, B](leftSerializer, rightSerializer)
   }
 
   override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
+      configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[T] = {
 
     configSnapshot match {
-      case eitherSerializerConfig: EitherSerializerConfigSnapshot[A, B] =>
-        val previousLeftRightSerWithConfigs =
-          eitherSerializerConfig.getNestedSerializersAndConfigs
-
-        val leftCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-          previousLeftRightSerWithConfigs.get(0).f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          previousLeftRightSerWithConfigs.get(0).f1,
-          leftSerializer)
-
-        val rightCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-          previousLeftRightSerWithConfigs.get(1).f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          previousLeftRightSerWithConfigs.get(1).f1,
-          rightSerializer)
-
-        if (leftCompatResult.isRequiresMigration
-            || rightCompatResult.isRequiresMigration) {
-
-          if (leftCompatResult.getConvertDeserializer != null
-              && rightCompatResult.getConvertDeserializer != null) {
-
-            CompatibilityResult.requiresMigration(
-              new EitherSerializer[A, B, T](
-                new TypeDeserializerAdapter(leftCompatResult.getConvertDeserializer),
-                new TypeDeserializerAdapter(rightCompatResult.getConvertDeserializer)
-              )
-            )
-
-          } else {
-            CompatibilityResult.requiresMigration()
-          }
-        } else {
-          CompatibilityResult.compatible()
-        }
+      case eitherSerializerConfig: ScalaEitherSerializerConfigSnapshot[T, A, B] =>
+        checkCompatibility(eitherSerializerConfig)
+
+      // backwards compatibility path;
+      // Flink versions older or equal to 1.5.x uses a
+      // EitherSerializerConfigSnapshot as the snapshot
+      case legacyConfig: EitherSerializerConfigSnapshot[A, B] =>
+        checkCompatibility(legacyConfig)
 
       case _ => CompatibilityResult.requiresMigration()
     }
   }
+
+  private def checkCompatibility(
+      configSnapshot: CompositeTypeSerializerConfigSnapshot[_]
+    ): CompatibilityResult[T] = {
+
+    val previousLeftRightSerWithConfigs =
+      configSnapshot.getNestedSerializersAndConfigs
+
+    val leftCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+      previousLeftRightSerWithConfigs.get(0).f0,
+      classOf[UnloadableDummyTypeSerializer[_]],
+      previousLeftRightSerWithConfigs.get(0).f1,
+      leftSerializer)
+
+    val rightCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+      previousLeftRightSerWithConfigs.get(1).f0,
+      classOf[UnloadableDummyTypeSerializer[_]],
+      previousLeftRightSerWithConfigs.get(1).f1,
+      rightSerializer)
+
+    if (leftCompatResult.isRequiresMigration
+      || rightCompatResult.isRequiresMigration) {
+      CompatibilityResult.requiresMigration()
+    } else {
+      CompatibilityResult.compatible()
+    }
+  }
 }
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index abc56d133e0..b7e21ceaaee 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -84,7 +84,7 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[
   }
 
   override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[E#Value] = {
+      configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[E#Value] = {
 
     configSnapshot match {
       case enumSerializerConfigSnapshot: EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[_] =>
@@ -122,7 +122,7 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[
 object EnumValueSerializer {
 
   class ScalaEnumSerializerConfigSnapshot[E <: Enumeration]
-      extends TypeSerializerConfigSnapshot {
+      extends TypeSerializerConfigSnapshot[E#Value] {
 
     var enumClass: Class[E] = _
     var enumConstants: List[(String, Int)] = _
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
index 01ca29594e2..eff57b68755 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
@@ -56,11 +56,11 @@ class NothingSerializer extends TypeSerializer[Any] {
   override def deserialize(reuse: Any, source: DataInputView): Any =
     throw new RuntimeException("This must not be used. You encountered a bug.")
 
-  override def snapshotConfiguration(): TypeSerializerConfigSnapshot =
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot[Any] =
     throw new RuntimeException("This must not be used. You encountered a bug.")
 
   override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Any] =
+      configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Any] =
     throw new RuntimeException("This must not be used. You encountered a bug.")
 
   override def equals(obj: Any): Boolean = {
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index aa4a0ea75b8..7f3aa8cd81f 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -106,21 +106,21 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
   }
 
   override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Option[A]] = {
+      configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Option[A]] = {
 
     configSnapshot match {
       case optionSerializerConfigSnapshot
           : ScalaOptionSerializerConfigSnapshot[A] =>
-        ensureCompatibility(optionSerializerConfigSnapshot)
+        ensureCompatibilityInternal(optionSerializerConfigSnapshot)
       case legacyOptionSerializerConfigSnapshot
           : OptionSerializer.OptionSerializerConfigSnapshot[A] =>
-        ensureCompatibility(legacyOptionSerializerConfigSnapshot)
+        ensureCompatibilityInternal(legacyOptionSerializerConfigSnapshot)
       case _ => CompatibilityResult.requiresMigration()
     }
   }
 
-  private def ensureCompatibility(
-      compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot)
+  private def ensureCompatibilityInternal(
+      compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot[Option[A]])
       : CompatibilityResult[Option[A]] = {
 
     val compatResult = CompatibilityUtil.resolveCompatibilityResult(
@@ -150,7 +150,7 @@ object OptionSerializer {
     * Once Flink 1.3.x is no longer supported, this can be removed.
     */
   class OptionSerializerConfigSnapshot[A]()
-      extends CompositeTypeSerializerConfigSnapshot {
+      extends CompositeTypeSerializerConfigSnapshot[Option[A]] {
 
     override def getVersion: Int = OptionSerializerConfigSnapshot.VERSION
   }
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index b54193b4829..b5d069fbe93 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -152,16 +152,16 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
     obj.isInstanceOf[TraversableSerializer[_, _]]
   }
 
-  override def snapshotConfiguration(): TraversableSerializerConfigSnapshot[E] = {
-    new TraversableSerializerConfigSnapshot[E](elementSerializer)
+  override def snapshotConfiguration(): TraversableSerializerConfigSnapshot[T, E] = {
+    new TraversableSerializerConfigSnapshot[T, E](elementSerializer)
   }
 
   override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
+      configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[T] = {
 
     configSnapshot match {
       case traversableSerializerConfigSnapshot
-          : TraversableSerializerConfigSnapshot[E] =>
+          : TraversableSerializerConfigSnapshot[T, E] =>
 
         val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
           traversableSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index cc9c5ccede7..786c1283938 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -109,21 +109,21 @@ class TrySerializer[A](
   }
 
   override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Try[A]] = {
+      configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Try[A]] = {
 
     configSnapshot match {
       case trySerializerConfigSnapshot
           : ScalaTrySerializerConfigSnapshot[A] =>
-        ensureCompatibility(trySerializerConfigSnapshot)
+        ensureCompatibilityInternal(trySerializerConfigSnapshot)
       case legacyTrySerializerConfigSnapshot
           : TrySerializer.TrySerializerConfigSnapshot[A] =>
-        ensureCompatibility(legacyTrySerializerConfigSnapshot)
+        ensureCompatibilityInternal(legacyTrySerializerConfigSnapshot)
       case _ => CompatibilityResult.requiresMigration()
     }
   }
 
-  private def ensureCompatibility(
-      compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot)
+  private def ensureCompatibilityInternal(
+      compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot[Try[A]])
         : CompatibilityResult[Try[A]] = {
 
     val previousSerializersAndConfigs =
@@ -156,7 +156,7 @@ object TrySerializer {
     * Once Flink 1.3.x is no longer supported, this can be removed.
     */
   class TrySerializerConfigSnapshot[A]()
-      extends CompositeTypeSerializerConfigSnapshot() {
+      extends CompositeTypeSerializerConfigSnapshot[Try[A]]() {
 
     override def getVersion: Int = TrySerializerConfigSnapshot.VERSION
   }
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
index 7d594fb2b16..3b8331abdf6 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
@@ -57,9 +57,13 @@ class TupleSerializerCompatibilityTest {
       assertTrue(oldSerializer.isInstanceOf[CaseClassSerializer[_]])
       assertTrue(oldConfigSnapshot.isInstanceOf[TupleSerializerConfigSnapshot[_]])
 
+      assertTrue(oldConfigSnapshot.isInstanceOf[TupleSerializerConfigSnapshot[_]])
+
       val currentSerializer = createTypeInformation[TestCaseClass]
         .createSerializer(new ExecutionConfig())
-      assertFalse(currentSerializer.ensureCompatibility(oldConfigSnapshot).isRequiresMigration)
+      assertFalse(currentSerializer
+        .ensureCompatibility(oldConfigSnapshot)
+        .isRequiresMigration)
 
       // test old data serialization
       is.close()
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
index 8be4debff35..55e2419a97a 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.typeutils
 import java.io._
 import java.net.{URL, URLClassLoader}
 
-import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializerSerializationUtil}
+import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializerSnapshotSerializationUtil}
 import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
 import org.apache.flink.util.TestLogger
 import org.junit.rules.TemporaryFolder
@@ -134,7 +134,8 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
 
     val baos = new ByteArrayOutputStream()
     val output = new DataOutputViewStreamWrapper(baos)
-    TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(output, snapshot)
+    TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+      output, snapshot, enumValueSerializer)
 
     output.close()
     baos.close()
@@ -144,9 +145,10 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
 
     val classLoader2 = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceB)
 
-    val snapshot2 = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
+    val snapshot2 = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
       input,
-      classLoader2)
+      classLoader2,
+      enumValueSerializer)
     val enum2 = instantiateEnum[Enumeration](classLoader2, enumName)
 
     val enumValueSerializer2 = new EnumValueSerializer(enum2)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 42a1e26b8b5..3d66db6e6fc 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -695,7 +695,7 @@ private void restoreKVStateMetaData() throws IOException, StateMigrationExceptio
 			// check for key serializer compatibility; this also reconfigures the
 			// key serializer to be compatible, if it is required and is possible
 			if (CompatibilityUtil.resolveCompatibilityResult(
-				serializationProxy.getKeySerializer(),
+				serializationProxy.restoreKeySerializer(),
 				UnloadableDummyTypeSerializer.class,
 				serializationProxy.getKeySerializerConfigSnapshot(),
 				rocksDBKeyedStateBackend.keySerializer)
@@ -1246,7 +1246,7 @@ private void restoreInstanceDirectoryFromPath(Path source) throws IOException {
 				// check for key serializer compatibility; this also reconfigures the
 				// key serializer to be compatible, if it is required and is possible
 				if (CompatibilityUtil.resolveCompatibilityResult(
-					serializationProxy.getKeySerializer(),
+					serializationProxy.restoreKeySerializer(),
 					UnloadableDummyTypeSerializer.class,
 					serializationProxy.getKeySerializerConfigSnapshot(),
 					stateBackend.keySerializer)
@@ -1582,7 +1582,7 @@ public static RocksIteratorWrapper getRocksIterator(
 			StateMetaInfoSnapshot.CommonSerializerKeys serializerKey =
 				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
 
-			TypeSerializer<?> metaInfoTypeSerializer = restoredMetaInfoSnapshot.getTypeSerializer(serializerKey);
+			TypeSerializer<?> metaInfoTypeSerializer = restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey);
 
 			if (metaInfoTypeSerializer != byteOrderedElementSerializer) {
 				CompatibilityResult<T> compatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 55009e1b4cb..e17673d4141 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -30,6 +30,7 @@
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
@@ -585,15 +586,15 @@ public boolean canEqual(Object obj) {
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		public TypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> snapshotConfiguration() {
 			return new UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer);
 		}
 
 		@Override
-		public CompatibilityResult<TaggedUnion<T1, T2>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		public CompatibilityResult<TaggedUnion<T1, T2>> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 			if (configSnapshot instanceof UnionSerializerConfigSnapshot) {
-				List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
-					((UnionSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+				List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousSerializersAndConfigs =
+					((UnionSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs();
 
 				CompatibilityResult<T1> oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
 					previousSerializersAndConfigs.get(0).f0,
@@ -624,7 +625,8 @@ public TypeSerializerConfigSnapshot snapshotConfiguration() {
 	/**
 	 * The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}.
 	 */
-	public static class UnionSerializerConfigSnapshot<T1, T2> extends CompositeTypeSerializerConfigSnapshot {
+	public static class UnionSerializerConfigSnapshot<T1, T2>
+			extends CompositeTypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> {
 
 		private static final int VERSION = 1;
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 2ffb6d5810e..03f12b585ae 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -30,6 +30,7 @@
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
@@ -763,16 +764,16 @@ public int hashCode() {
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		public TypeSerializerConfigSnapshot<State<TXN, CONTEXT>> snapshotConfiguration() {
 			return new StateSerializerConfigSnapshot<>(transactionSerializer, contextSerializer);
 		}
 
 		@Override
 		public CompatibilityResult<State<TXN, CONTEXT>> ensureCompatibility(
-				TypeSerializerConfigSnapshot configSnapshot) {
+				TypeSerializerConfigSnapshot<?> configSnapshot) {
 			if (configSnapshot instanceof StateSerializerConfigSnapshot) {
-				List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
-						((StateSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+				List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousSerializersAndConfigs =
+						((StateSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs();
 
 				CompatibilityResult<TXN> txnCompatResult = CompatibilityUtil.resolveCompatibilityResult(
 						previousSerializersAndConfigs.get(0).f0,
@@ -809,7 +810,7 @@ public TypeSerializerConfigSnapshot snapshotConfiguration() {
 	 */
 	@Internal
 	public static final class StateSerializerConfigSnapshot<TXN, CONTEXT>
-			extends CompositeTypeSerializerConfigSnapshot {
+			extends CompositeTypeSerializerConfigSnapshot<State<TXN, CONTEXT>> {
 
 		private static final int VERSION = 1;
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java
index bedd4e05626..3dde6950a1d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -33,9 +33,9 @@
 public class InternalTimersSnapshot<K, N> {
 
 	private TypeSerializer<K> keySerializer;
-	private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
+	private TypeSerializerSnapshot<K> keySerializerConfigSnapshot;
 	private TypeSerializer<N> namespaceSerializer;
-	private TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot;
+	private TypeSerializerSnapshot<N> namespaceSerializerConfigSnapshot;
 
 	private Set<TimerHeapInternalTimer<K, N>> eventTimeTimers;
 	private Set<TimerHeapInternalTimer<K, N>> processingTimeTimers;
@@ -46,9 +46,9 @@ public InternalTimersSnapshot() {}
 	/** Constructor to use when snapshotting the timers. */
 	public InternalTimersSnapshot(
 			TypeSerializer<K> keySerializer,
-			TypeSerializerConfigSnapshot keySerializerConfigSnapshot,
+			TypeSerializerSnapshot<K> keySerializerConfigSnapshot,
 			TypeSerializer<N> namespaceSerializer,
-			TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot,
+			TypeSerializerSnapshot<N> namespaceSerializerConfigSnapshot,
 			@Nullable Set<TimerHeapInternalTimer<K, N>> eventTimeTimers,
 			@Nullable Set<TimerHeapInternalTimer<K, N>> processingTimeTimers) {
 
@@ -68,11 +68,11 @@ public void setKeySerializer(TypeSerializer<K> keySerializer) {
 		this.keySerializer = keySerializer;
 	}
 
-	public TypeSerializerConfigSnapshot getKeySerializerConfigSnapshot() {
+	public TypeSerializerSnapshot<K> getKeySerializerConfigSnapshot() {
 		return keySerializerConfigSnapshot;
 	}
 
-	public void setKeySerializerConfigSnapshot(TypeSerializerConfigSnapshot keySerializerConfigSnapshot) {
+	public void setKeySerializerConfigSnapshot(TypeSerializerSnapshot<K> keySerializerConfigSnapshot) {
 		this.keySerializerConfigSnapshot = keySerializerConfigSnapshot;
 	}
 
@@ -84,11 +84,11 @@ public void setNamespaceSerializer(TypeSerializer<N> namespaceSerializer) {
 		this.namespaceSerializer = namespaceSerializer;
 	}
 
-	public TypeSerializerConfigSnapshot getNamespaceSerializerConfigSnapshot() {
+	public TypeSerializerSnapshot<N> getNamespaceSerializerConfigSnapshot() {
 		return namespaceSerializerConfigSnapshot;
 	}
 
-	public void setNamespaceSerializerConfigSnapshot(TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot) {
+	public void setNamespaceSerializerConfigSnapshot(TypeSerializerSnapshot<N> namespaceSerializerConfigSnapshot) {
 		this.namespaceSerializerConfigSnapshot = namespaceSerializerConfigSnapshot;
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
index 4f762e24792..b95b0485d1e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
@@ -23,6 +23,7 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
@@ -284,13 +285,13 @@ protected void restoreKeyAndNamespaceSerializers(
 				InternalTimersSnapshot<K, N> restoredTimersSnapshot,
 				DataInputView in) throws IOException {
 
-			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs =
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigs =
 				TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader);
 
 			restoredTimersSnapshot.setKeySerializer((TypeSerializer<K>) serializersAndConfigs.get(0).f0);
-			restoredTimersSnapshot.setKeySerializerConfigSnapshot(serializersAndConfigs.get(0).f1);
+			restoredTimersSnapshot.setKeySerializerConfigSnapshot((TypeSerializerSnapshot<K>) serializersAndConfigs.get(0).f1);
 			restoredTimersSnapshot.setNamespaceSerializer((TypeSerializer<N>) serializersAndConfigs.get(1).f0);
-			restoredTimersSnapshot.setNamespaceSerializerConfigSnapshot(serializersAndConfigs.get(1).f1);
+			restoredTimersSnapshot.setNamespaceSerializerConfigSnapshot((TypeSerializerSnapshot<N>) serializersAndConfigs.get(1).f1);
 		}
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
index a83cc3a8797..b641c0932af 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
@@ -23,6 +23,7 @@
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
@@ -208,7 +209,7 @@ public boolean canEqual(Object obj) {
 	}
 
 	@Override
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+	public TypeSerializerConfigSnapshot<TimerHeapInternalTimer<K, N>> snapshotConfiguration() {
 		return new TimerSerializerConfigSnapshot<>(keySerializer, namespaceSerializer);
 	}
 
@@ -217,13 +218,13 @@ public TypeSerializerConfigSnapshot snapshotConfiguration() {
 		TypeSerializerConfigSnapshot configSnapshot) {
 
 		if (configSnapshot instanceof TimerSerializerConfigSnapshot) {
-			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
-				((TimerSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousSerializersAndConfigs =
+				((TimerSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs();
 
 			if (previousSerializersAndConfigs.size() == 2) {
-				Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> keySerializerAndSnapshot =
+				Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> keySerializerAndSnapshot =
 					previousSerializersAndConfigs.get(KEY_SERIALIZER_SNAPSHOT_INDEX);
-				Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> namespaceSerializerAndSnapshot =
+				Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> namespaceSerializerAndSnapshot =
 					previousSerializersAndConfigs.get(NAMESPACE_SERIALIZER_SNAPSHOT_INDEX);
 				CompatibilityResult<K> keyCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
 					keySerializerAndSnapshot.f0,
@@ -262,7 +263,7 @@ public TypeSerializerConfigSnapshot snapshotConfiguration() {
 	 * @param <K> type of key.
 	 * @param <N> type of namespace.
 	 */
-	public static class TimerSerializerConfigSnapshot<K, N> extends CompositeTypeSerializerConfigSnapshot {
+	public static class TimerSerializerConfigSnapshot<K, N> extends CompositeTypeSerializerConfigSnapshot<TimerHeapInternalTimer<K, N>> {
 
 		private static final int VERSION = 1;
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index ed6022ff592..f84ff15a6ee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -25,6 +25,7 @@
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
@@ -282,18 +283,18 @@ public int hashCode() {
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public StreamElementSerializerConfigSnapshot snapshotConfiguration() {
+	public StreamElementSerializerConfigSnapshot<T> snapshotConfiguration() {
 		return new StreamElementSerializerConfigSnapshot<>(typeSerializer);
 	}
 
 	@Override
-	public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-		Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig;
+	public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
+		Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> previousTypeSerializerAndConfig;
 
 		// we are compatible for data written by ourselves or the legacy MultiplexingStreamRecordSerializer
 		if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) {
 			previousTypeSerializerAndConfig =
-				((StreamElementSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
+				((StreamElementSerializerConfigSnapshot<?>) configSnapshot).getSingleNestedSerializerAndConfig();
 		} else {
 			return CompatibilityResult.requiresMigration();
 		}
@@ -318,7 +319,7 @@ public StreamElementSerializerConfigSnapshot snapshotConfiguration() {
 	/**
 	 * Configuration snapshot specific to the {@link StreamElementSerializer}.
 	 */
-	public static final class StreamElementSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+	public static final class StreamElementSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<StreamElement> {
 
 		private static final int VERSION = 1;
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java
index 93144964808..fd155c42822 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java
@@ -63,7 +63,7 @@
 
 		private transient ListState<String> unionListState;
 
-		CheckpointingNonParallelSourceWithListState(int numElements) {
+		public CheckpointingNonParallelSourceWithListState(int numElements) {
 			this.numElements = numElements;
 		}
 
@@ -121,7 +121,7 @@ public void cancel() {
 
 		private final int numElements;
 
-		CheckingNonParallelSourceWithListState(int numElements) {
+		public CheckingNonParallelSourceWithListState(int numElements) {
 			this.numElements = numElements;
 		}
 
@@ -197,7 +197,7 @@ public void cancel() {
 
 		private transient ListState<String> unionListState;
 
-		CheckpointingParallelSourceWithUnionListState(int numElements) {
+		public CheckpointingParallelSourceWithUnionListState(int numElements) {
 			this.numElements = numElements;
 		}
 
@@ -259,7 +259,7 @@ public void cancel() {
 
 		private final int numElements;
 
-		CheckingParallelSourceWithUnionListState(int numElements) {
+		public CheckingParallelSourceWithUnionListState(int numElements) {
 			this.numElements = numElements;
 		}
 
@@ -316,7 +316,7 @@ public void cancel() {
 	public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
 		private static final long serialVersionUID = 1L;
 
-		static final String NUM_ELEMENTS_ACCUMULATOR = AccumulatorCountingSink.class + "_NUM_ELEMENTS";
+		public static final String NUM_ELEMENTS_ACCUMULATOR = AccumulatorCountingSink.class + "_NUM_ELEMENTS";
 
 		int count = 0;
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
new file mode 100644
index 00000000000..e0922bfdbae
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+
+import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
+import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Migration IT cases for upgrading a legacy {@link TypeSerializerConfigSnapshot}
+ * that is written in checkpoints to {@link TypeSerializerSnapshot} interface.
+ *
+ * <p>A savepoints used by this test were written with a serializer snapshot class
+ * that extends {@link TypeSerializerConfigSnapshot}, as can be seen in the commented
+ * out code at the end of this class. On restore, we change the snapshot to implement
+ * directly a {@link TypeSerializerSnapshot}.
+ */
+@RunWith(Parameterized.class)
+public class TypeSerializerSnapshotMigrationITCase extends SavepointMigrationTestBase {
+
+	private static final int NUM_SOURCE_ELEMENTS = 4;
+
+	/**
+	 * This test runs in either of two modes: 1) we want to generate the binary savepoint, i.e.
+	 * we have to run the checkpointing functions 2) we want to verify restoring, so we have to run
+	 * the checking functions.
+	 */
+	public enum ExecutionMode {
+		PERFORM_SAVEPOINT,
+		VERIFY_SAVEPOINT
+	}
+
+	// TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+	private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT;
+
+	@Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+	public static Collection<Tuple2<MigrationVersion, String>> parameters () {
+		return Arrays.asList(
+			Tuple2.of(MigrationVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+			Tuple2.of(MigrationVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+			Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+			Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+			Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+			Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+			Tuple2.of(MigrationVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+			Tuple2.of(MigrationVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+	}
+
+	private final MigrationVersion testMigrateVersion;
+	private final String testStateBackend;
+
+	public TypeSerializerSnapshotMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
+		this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+		this.testStateBackend = testMigrateVersionAndBackend.f1;
+	}
+
+	@Test
+	public void testSavepoint() throws Exception {
+		final int parallelism = 1;
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setRestartStrategy(RestartStrategies.noRestart());
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		switch (testStateBackend) {
+			case "rocksdb":
+				env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+				break;
+			case "jobmanager":
+				env.setStateBackend(new MemoryStateBackend());
+				break;
+			default:
+				throw new UnsupportedOperationException();
+		}
+
+		env.enableCheckpointing(500);
+		env.setParallelism(parallelism);
+		env.setMaxParallelism(parallelism);
+
+		SourceFunction<Tuple2<Long, Long>> nonParallelSource =
+			new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+
+		env.addSource(nonParallelSource)
+			.keyBy(0)
+			.map(new TestMapFunction())
+			.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
+
+		if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) {
+			executeAndSavepoint(
+				env,
+				"src/test/resources/" + getSavepointPath(testMigrateVersion, testStateBackend),
+				new Tuple2<>(MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+		} else {
+			restoreAndExecute(
+				env,
+				getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)),
+				new Tuple2<>(MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+		}
+	}
+
+	private String getSavepointPath(MigrationVersion savepointVersion, String backendType) {
+		switch (backendType) {
+			case "rocksdb":
+				return "type-serializer-snapshot-migration-itcase-flink" + savepointVersion + "-rocksdb-savepoint";
+			case "jobmanager":
+				return "type-serializer-snapshot-migration-itcase-flink" + savepointVersion + "-savepoint";
+			default:
+				throw new UnsupportedOperationException();
+		}
+	}
+
+	private static class TestMapFunction extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+			ValueState<Long> state = getRuntimeContext().getState(
+				new ValueStateDescriptor<>("testState", new TestSerializer()));
+
+			state.update(value.f1);
+			return value;
+		}
+	}
+
+	private static class TestSerializer extends TypeSerializer<Long> {
+
+		private static final long serialVersionUID = 1L;
+
+		private LongSerializer serializer = new LongSerializer();
+
+		private String configPayload = "configPayload";
+
+		@Override
+		public TypeSerializerSnapshot<Long> snapshotConfiguration() {
+			return new TestSerializerSnapshot(configPayload);
+		}
+
+		/*
+		@Override
+		public CompatibilityResult<Long> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			return (configSnapshot instanceof TestSerializerSnapshot
+				&& ((TestSerializerSnapshot) configSnapshot).configPayload.equals(configPayload))
+				? CompatibilityResult.compatible()
+				: CompatibilityResult.requiresMigration();
+		}
+		*/
+
+		@Override
+		public TypeSerializer<Long> duplicate() {
+			return this;
+		}
+
+		// ------------------------------------------------------------------
+		//  Simple forwarded serializer methods
+		// ------------------------------------------------------------------
+
+		@Override
+		public void serialize(Long record, DataOutputView target) throws IOException {
+			serializer.serialize(record, target);
+		}
+
+		@Override
+		public Long deserialize(Long reuse, DataInputView source) throws IOException {
+			return serializer.deserialize(reuse, source);
+		}
+
+		@Override
+		public Long deserialize(DataInputView source) throws IOException {
+			return serializer.deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			serializer.copy(source, target);
+		}
+
+		@Override
+		public Long copy(Long from) {
+			return serializer.copy(from);
+		}
+
+		@Override
+		public Long copy(Long from, Long reuse) {
+			return serializer.copy(from, reuse);
+		}
+
+		@Override
+		public Long createInstance() {
+			return serializer.createInstance();
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return serializer.isImmutableType();
+		}
+
+		@Override
+		public int getLength() {
+			return serializer.getLength();
+		}
+
+		@Override
+		public int hashCode() {
+			return getClass().hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof TestSerializer;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return true;
+		}
+	}
+
+	public static class TestSerializerSnapshot implements TypeSerializerSnapshot<Long> {
+
+		private String configPayload;
+
+		public TestSerializerSnapshot() {}
+
+		public TestSerializerSnapshot(String configPayload) {
+			this.configPayload = configPayload;
+		}
+
+		@Override
+		public int getCurrentVersion() {
+			return 1;
+		}
+
+		@Override
+		public <NS extends TypeSerializer<Long>> TypeSerializerSchemaCompatibility<Long, NS> resolveSchemaCompatibility(NS newSerializer) {
+			return (newSerializer instanceof TestSerializer)
+				? TypeSerializerSchemaCompatibility.compatibleAsIs()
+				: TypeSerializerSchemaCompatibility.incompatible();
+		}
+
+		@Override
+		public TypeSerializer<Long> restoreSerializer() {
+			return new TestSerializer();
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeUTF(configPayload);
+		}
+
+		@Override
+		public void read(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+			if (readVersion != 1) {
+				throw new IllegalStateException("Can not recognize read version: " + readVersion);
+			}
+
+			this.configPayload = in.readUTF();
+		}
+	}
+
+	/**
+	 * This was the implementation of {@link TestSerializerSnapshot} when the savepoints
+	 * were written.
+	 */
+	/*
+	public static class TestSerializerSnapshot extends TypeSerializerConfigSnapshot {
+
+		private String configPayload;
+
+		public TestSerializerSnapshot() {}
+
+		public TestSerializerSnapshot(String configPayload) {
+			this.configPayload = configPayload;
+		}
+
+		@Override
+		public int getVersion() {
+			return 1;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+			out.writeUTF(configPayload);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			super.read(in);
+			this.configPayload = in.readUTF();
+		}
+
+		@Override
+		public int hashCode() {
+			return getClass().hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof TestSerializerSnapshot;
+		}
+	}
+	*/
+}
diff --git a/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
new file mode 100644
index 00000000000..a390e539b4b
Binary files /dev/null and b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.3-rocksdb-savepoint/_metadata differ
diff --git a/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.3-savepoint/_metadata b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.3-savepoint/_metadata
new file mode 100644
index 00000000000..6f4b3648186
Binary files /dev/null and b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.3-savepoint/_metadata differ
diff --git a/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.4-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.4-rocksdb-savepoint/_metadata
new file mode 100644
index 00000000000..459e7d690e6
Binary files /dev/null and b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.4-rocksdb-savepoint/_metadata differ
diff --git a/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.4-savepoint/_metadata b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.4-savepoint/_metadata
new file mode 100644
index 00000000000..557fad86f5c
Binary files /dev/null and b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.4-savepoint/_metadata differ
diff --git a/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.5-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.5-rocksdb-savepoint/_metadata
new file mode 100644
index 00000000000..29d4b42954d
Binary files /dev/null and b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.5-rocksdb-savepoint/_metadata differ
diff --git a/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.5-savepoint/_metadata b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.5-savepoint/_metadata
new file mode 100644
index 00000000000..b2378414756
Binary files /dev/null and b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.5-savepoint/_metadata differ
diff --git a/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.6-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
new file mode 100644
index 00000000000..2340144e92b
Binary files /dev/null and b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.6-rocksdb-savepoint/_metadata differ
diff --git a/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.6-savepoint/_metadata b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.6-savepoint/_metadata
new file mode 100644
index 00000000000..facdda6361a
Binary files /dev/null and b/flink-tests/src/test/resources/type-serializer-snapshot-migration-itcase-flink1.6-savepoint/_metadata differ


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services