You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/08 16:51:13 UTC

[GitHub] asfgit closed pull request #7422: [FLINK-11073] (part 2) Introduce CompositeTypeSerializerSnapshot and migrate existing composite serializers' snapshots

asfgit closed pull request #7422: [FLINK-11073] (part 2) Introduce CompositeTypeSerializerSnapshot and migrate existing composite serializers' snapshots
URL: https://github.com/apache/flink/pull/7422
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
new file mode 100644
index 00000000000..c73e24c0897
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A {@link CompositeTypeSerializerSnapshot} is a convenient serializer snapshot class that can be used by
+ * simple serializers which 1) delegates its serialization to multiple nested serializers, and 2) may contain
+ * some extra static information that needs to be persisted as part of its snapshot.
+ *
+ * <p>Examples for this would be the {@link ListSerializer}, {@link MapSerializer}, {@link EitherSerializer}, etc.,
+ * in which case the serializer, called the "outer" serializer in this context, has only some nested serializers that
+ * needs to be persisted as its snapshot, and nothing else that needs to be persisted as the "outer" snapshot.
+ * An example which has non-empty outer snapshots would be the {@link GenericArraySerializer}, which beyond the
+ * nested component serializer, also contains a class of the component type that needs to be persisted.
+ *
+ * <p>Serializers that do have some outer snapshot needs to make sure to implement the methods
+ * {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, and
+ * {@link #isOuterSnapshotCompatible(TypeSerializer)} when using this class as the base for its serializer snapshot
+ * class. By default, the base implementations of these methods are empty, i.e. this class assumes that
+ * subclasses do not have any outer snapshot that needs to be persisted.
+ *
+ * <h2>Snapshot Versioning</h2>
+ *
+ * <p>This base class has its own versioning for the format in which it writes the outer snapshot and the
+ * nested serializer snapshots. The version of the serialization format of this based class is defined
+ * by {@link #getCurrentVersion()}. This is independent of the version in which subclasses writes their outer snapshot,
+ * defined by {@link #getCurrentOuterSnapshotVersion()}.
+ * This means that the outer snapshot's version can be maintained only taking into account changes in how the
+ * outer snapshot is written. Any changes in the base format does not require upticks in the outer snapshot's version.
+ *
+ * <h2>Serialization Format</hr>
+ *
+ * <p>The current version of the serialization format of a {@link CompositeTypeSerializerSnapshot} is as follows:
+ *
+ * <pre>{@code
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | CompositeTypeSerializerSnapshot | CompositeTypeSerializerSnapshot |          Outer snapshot         |
+ * |           version               |          MAGIC_NUMBER           |              version            |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                                               Outer snapshot                                        |
+ * |                                   #writeOuterSnapshot(DataOutputView out)                           |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |      Delegate MAGIC_NUMBER      |         Delegate version        |     Num. nested serializers     |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                                     Nested serializer snapshots                                     |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * }</pre>
+ *
+ * @param <T> The data type that the originating serializer of this snapshot serializes.
+ * @param <S> The type of the originating serializer.
+ */
+@PublicEvolving
+public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerializer> implements TypeSerializerSnapshot<T> {
+
+	/** Magic number for integrity checks during deserialization. */
+	private static final int MAGIC_NUMBER = 911108;
+
+	/**
+	 * Current version of the base serialization format.
+	 *
+	 * <p>NOTE: We start from version 3. This version is represented by the {@link #getCurrentVersion()} method.
+	 * Previously, this method was used to represent the outer snapshot's version (now, represented
+	 * by the {@link #getCurrentOuterSnapshotVersion()} method).
+	 *
+	 * <p>To bridge this transition, we set the starting version of the base format to be at least
+	 * larger than the highest version of previously defined values in implementing subclasses,
+	 * which was {@link #HIGHEST_LEGACY_READ_VERSION}. This allows us to identify legacy deserialization paths,
+	 * which did not contain versioning for the base format, simply by checking if the read
+	 * version of the snapshot is smaller than or equal to {@link #HIGHEST_LEGACY_READ_VERSION}.
+	 */
+	private static final int VERSION = 3;
+
+	private static final int HIGHEST_LEGACY_READ_VERSION = 2;
+
+	private NestedSerializersSnapshotDelegate nestedSerializersSnapshotDelegate;
+
+	private final Class<S> correspondingSerializerClass;
+
+	/**
+	 * Constructor to be used for read instantiation.
+	 *
+	 * @param correspondingSerializerClass the expected class of the new serializer.
+	 */
+	public CompositeTypeSerializerSnapshot(Class<S> correspondingSerializerClass) {
+		this.correspondingSerializerClass = Preconditions.checkNotNull(correspondingSerializerClass);
+	}
+
+	/**
+	 * Constructor to be used for writing the snapshot.
+	 *
+	 * @param serializerInstance an instance of the originating serializer of this snapshot.
+	 */
+	@SuppressWarnings("unchecked")
+	public CompositeTypeSerializerSnapshot(S serializerInstance) {
+		Preconditions.checkNotNull(serializerInstance);
+		this.nestedSerializersSnapshotDelegate = new NestedSerializersSnapshotDelegate(getNestedSerializers(serializerInstance));
+		this.correspondingSerializerClass = (Class<S>) serializerInstance.getClass();
+	}
+
+	@Override
+	public final int getCurrentVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public final void writeSnapshot(DataOutputView out) throws IOException {
+		internalWriteOuterSnapshot(out);
+		nestedSerializersSnapshotDelegate.writeNestedSerializerSnapshots(out);
+	}
+
+	@Override
+	public final void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		if (readVersion > HIGHEST_LEGACY_READ_VERSION) {
+			internalReadOuterSnapshot(in, userCodeClassLoader);
+		} else {
+			legacyInternalReadOuterSnapshot(readVersion, in, userCodeClassLoader);
+		}
+		this.nestedSerializersSnapshotDelegate = NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, userCodeClassLoader);
+	}
+
+	@Override
+	public final TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
+		if (newSerializer.getClass() != correspondingSerializerClass) {
+			return TypeSerializerSchemaCompatibility.incompatible();
+		}
+
+		S castedNewSerializer = correspondingSerializerClass.cast(newSerializer);
+
+		// check that outer configuration is compatible; if not, short circuit result
+		if (!isOuterSnapshotCompatible(castedNewSerializer)) {
+			return TypeSerializerSchemaCompatibility.incompatible();
+		}
+
+		// since outer configuration is compatible, the final compatibility result depends only on the nested serializers
+		return constructFinalSchemaCompatibilityResult(
+			getNestedSerializers(castedNewSerializer),
+			nestedSerializersSnapshotDelegate.getNestedSerializerSnapshots());
+	}
+
+	@Override
+	public final TypeSerializer<T> restoreSerializer() {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<T> serializer = (TypeSerializer<T>)
+			createOuterSerializerWithNestedSerializers(nestedSerializersSnapshotDelegate.getRestoredNestedSerializers());
+
+		return serializer;
+	}
+
+	// ------------------------------------------------------------------------------------------
+	//  Outer serializer access methods
+	// ------------------------------------------------------------------------------------------
+
+	/**
+	 * Returns the version of the current outer snapshot's written binary format.
+	 *
+	 * @return the version of the current outer snapshot's written binary format.
+	 */
+	protected abstract int getCurrentOuterSnapshotVersion();
+
+	/**
+	 * Gets the nested serializers from the outer serializer.
+	 *
+	 * @param outerSerializer the outer serializer.
+	 *
+	 * @return the nested serializers.
+	 */
+	protected abstract TypeSerializer<?>[] getNestedSerializers(S outerSerializer);
+
+	/**
+	 * Creates an instance of the outer serializer with a given array of its nested serializers.
+	 *
+	 * @param nestedSerializers array of nested serializers to create the outer serializer with.
+	 *
+	 * @return an instance of the outer serializer.
+	 */
+	protected abstract S createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers);
+
+	// ------------------------------------------------------------------------------------------
+	//  Outer snapshot methods; need to be overridden if outer snapshot is not empty,
+	//  or in other words, the outer serializer has extra configuration beyond its nested serializers.
+	// ------------------------------------------------------------------------------------------
+
+	/**
+	 * Writes the outer snapshot, i.e. any information beyond the nested serializers of the outer serializer.
+	 *
+	 * <p>The base implementation of this methods writes nothing, i.e. it assumes that the outer serializer
+	 * only has nested serializers and no extra information. Otherwise, if the outer serializer contains
+	 * some extra information that needs to be persisted as part of the serializer snapshot, this
+	 * must be overridden. Note that this method and the corresponding methods
+	 * {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
+	 * needs to be implemented.
+	 *
+	 * @param out the {@link DataOutputView} to write the outer snapshot to.
+	 */
+	protected void writeOuterSnapshot(DataOutputView out) throws IOException {}
+
+	/**
+	 * Reads the outer snapshot, i.e. any information beyond the nested serializers of the outer serializer.
+	 *
+	 * <p>The base implementation of this methods reads nothing, i.e. it assumes that the outer serializer
+	 * only has nested serializers and no extra information. Otherwise, if the outer serializer contains
+	 * some extra information that has been persisted as part of the serializer snapshot, this
+	 * must be overridden. Note that this method and the corresponding methods
+	 * {@link #writeOuterSnapshot(DataOutputView)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
+	 * needs to be implemented.
+	 *
+	 * @param readOuterSnapshotVersion the read version of the outer snapshot.
+	 * @param in the {@link DataInputView} to read the outer snapshot from.
+	 * @param userCodeClassLoader the user code class loader.
+	 */
+	protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {}
+
+	/**
+	 * Checks whether the outer snapshot is compatible with a given new serializer.
+	 *
+	 * <p>The base implementation of this method just returns {@code true}, i.e. it assumes that the outer serializer
+	 * only has nested serializers and no extra information, and therefore the result of the check must always
+	 * be true. Otherwise, if the outer serializer contains
+	 * some extra information that has been persisted as part of the serializer snapshot, this
+	 * must be overridden. Note that this method and the corresponding methods
+	 * {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}
+	 * needs to be implemented.
+	 *
+	 * @param newSerializer the new serializer, which contains the new outer information to check against.
+	 *
+	 * @return a flag indicating whether or not the new serializer's outer information is compatible with the one
+	 *         written in this snapshot.
+	 */
+	protected boolean isOuterSnapshotCompatible(S newSerializer) {
+		return true;
+	}
+
+	// ------------------------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------------------------
+
+	private void internalWriteOuterSnapshot(DataOutputView out) throws IOException {
+		out.writeInt(MAGIC_NUMBER);
+		out.writeInt(getCurrentOuterSnapshotVersion());
+
+		writeOuterSnapshot(out);
+	}
+
+	private void internalReadOuterSnapshot(DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		final int magicNumber = in.readInt();
+		if (magicNumber != MAGIC_NUMBER) {
+			throw new IOException(String.format("Corrupt data, magic number mismatch. Expected %8x, found %8x",
+				MAGIC_NUMBER, magicNumber));
+		}
+
+		final int outerSnapshotVersion = in.readInt();
+		readOuterSnapshot(outerSnapshotVersion, in, userCodeClassLoader);
+	}
+
+	private void legacyInternalReadOuterSnapshot(
+			int legacyReadVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+
+		// legacy versions did not contain the pre-fixed magic numbers; just read the outer snapshot
+		readOuterSnapshot(legacyReadVersion, in, userCodeClassLoader);
+	}
+
+	private TypeSerializerSchemaCompatibility<T> constructFinalSchemaCompatibilityResult(
+			TypeSerializer<?>[] newNestedSerializers,
+			TypeSerializerSnapshot<?>[] nestedSerializerSnapshots) {
+
+		Preconditions.checkArgument(newNestedSerializers.length == nestedSerializerSnapshots.length,
+			"Different number of new serializers and existing serializer snapshots.");
+
+		TypeSerializer<?>[] reconfiguredNestedSerializers = new TypeSerializer[newNestedSerializers.length];
+
+		// check nested serializers for compatibility
+		boolean nestedSerializerRequiresMigration = false;
+		boolean hasReconfiguredNestedSerializers = false;
+		for (int i = 0; i < nestedSerializerSnapshots.length; i++) {
+			TypeSerializerSchemaCompatibility<?> compatibility =
+				resolveCompatibility(newNestedSerializers[i], nestedSerializerSnapshots[i]);
+
+			// if any one of the new nested serializers is incompatible, we can just short circuit the result
+			if (compatibility.isIncompatible()) {
+				return TypeSerializerSchemaCompatibility.incompatible();
+			}
+
+			if (compatibility.isCompatibleAfterMigration()) {
+				nestedSerializerRequiresMigration = true;
+			} else if (compatibility.isCompatibleWithReconfiguredSerializer()) {
+				hasReconfiguredNestedSerializers = true;
+				reconfiguredNestedSerializers[i] = compatibility.getReconfiguredSerializer();
+			} else if (compatibility.isCompatibleAsIs()) {
+				reconfiguredNestedSerializers[i] = newNestedSerializers[i];
+			} else {
+				throw new IllegalStateException("Undefined compatibility type.");
+			}
+		}
+
+		if (nestedSerializerRequiresMigration) {
+			return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+		}
+
+		if (hasReconfiguredNestedSerializers) {
+			@SuppressWarnings("unchecked")
+			TypeSerializer<T> reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers(reconfiguredNestedSerializers);
+			return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredCompositeSerializer);
+		}
+
+		// ends up here if everything is compatible as is
+		return TypeSerializerSchemaCompatibility.compatibleAsIs();
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <E> TypeSerializerSchemaCompatibility<E> resolveCompatibility(
+		TypeSerializer<?> serializer,
+		TypeSerializerSnapshot<?> snapshot) {
+
+		TypeSerializer<E> typedSerializer = (TypeSerializer<E>) serializer;
+		TypeSerializerSnapshot<E> typedSnapshot = (TypeSerializerSnapshot<E>) snapshot;
+
+		return typedSnapshot.resolveSchemaCompatibility(typedSerializer);
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/NestedSerializersSnapshotDelegate.java
similarity index 79%
rename from flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
rename to flink-core/src/main/java/org/apache/flink/api/common/typeutils/NestedSerializersSnapshotDelegate.java
index 93f5a703257..a4dcdd2cda9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/NestedSerializersSnapshotDelegate.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -30,21 +30,21 @@
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * A CompositeSerializerSnapshot represents the snapshots of multiple serializers that are used
+ * A NestedSerializersSnapshotDelegate represents the snapshots of multiple serializers that are used
  * by an outer serializer. Examples would be tuples, where the outer serializer is the tuple
- * format serializer, an the CompositeSerializerSnapshot holds the serializers for the
+ * format serializer, and the NestedSerializersSnapshotDelegate holds the serializers for the
  * different tuple fields.
  *
- * <p>The CompositeSerializerSnapshot does not implement the {@link TypeSerializerSnapshot} interface.
+ * <p>The NestedSerializersSnapshotDelegate does not implement the {@link TypeSerializerSnapshot} interface.
  * It is not meant to be inherited from, but to be composed with a serializer snapshot implementation.
  *
- * <p>The CompositeSerializerSnapshot has its own versioning internally, it does not couple its
+ * <p>The NestedSerializersSnapshotDelegate has its own versioning internally, it does not couple its
  * versioning to the versioning of the TypeSerializerSnapshot that builds on top of this class.
- * That way, the CompositeSerializerSnapshot and enclosing TypeSerializerSnapshot the can evolve
+ * That way, the NestedSerializersSnapshotDelegate and enclosing TypeSerializerSnapshot the can evolve
  * their formats independently.
  */
-@PublicEvolving
-public class CompositeSerializerSnapshot {
+@Internal
+public class NestedSerializersSnapshotDelegate {
 
 	/** Magic number for integrity checks during deserialization. */
 	private static final int MAGIC_NUMBER = 1333245;
@@ -58,14 +58,14 @@
 	/**
 	 * Constructor to create a snapshot for writing.
 	 */
-	public CompositeSerializerSnapshot(TypeSerializer<?>... serializers) {
+	public NestedSerializersSnapshotDelegate(TypeSerializer<?>... serializers) {
 		this.nestedSnapshots = TypeSerializerUtils.snapshotBackwardsCompatible(serializers);
 	}
 
 	/**
 	 * Constructor to create a snapshot during deserialization.
 	 */
-	private CompositeSerializerSnapshot(TypeSerializerSnapshot<?>[] snapshots) {
+	private NestedSerializersSnapshotDelegate(TypeSerializerSnapshot<?>[] snapshots) {
 		this.nestedSnapshots = snapshots;
 	}
 
@@ -77,14 +77,14 @@ private CompositeSerializerSnapshot(TypeSerializerSnapshot<?>[] snapshots) {
 	 * Produces a restore serializer from each contained serializer configuration snapshot.
 	 * The serializers are returned in the same order as the snapshots are stored.
 	 */
-	public TypeSerializer<?>[] getRestoreSerializers() {
+	public TypeSerializer<?>[] getRestoredNestedSerializers() {
 		return snapshotsToRestoreSerializers(nestedSnapshots);
 	}
 
 	/**
 	 * Creates the restore serializer from the pos-th config snapshot.
 	 */
-	public <T> TypeSerializer<T> getRestoreSerializer(int pos) {
+	public <T> TypeSerializer<T> getRestoredNestedSerializer(int pos) {
 		checkArgument(pos < nestedSnapshots.length);
 
 		@SuppressWarnings("unchecked")
@@ -93,10 +93,23 @@ private CompositeSerializerSnapshot(TypeSerializerSnapshot<?>[] snapshots) {
 		return snapshot.restoreSerializer();
 	}
 
+	/**
+	 * Returns the snapshots of the nested serializers.
+	 *
+	 * @return the snapshots of the nested serializers.
+	 */
+	public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
+		return nestedSnapshots;
+	}
+
 	/**
 	 * Resolves the compatibility of the nested serializer snapshots with the nested
 	 * serializers of the new outer serializer.
+	 *
+	 * @deprecated this no method will be removed in the future. Resolving compatibility for nested
+	 *             serializers is now handled by {@link CompositeTypeSerializerSnapshot}.
 	 */
+	@Deprecated
 	public <T> TypeSerializerSchemaCompatibility<T> resolveCompatibilityWithNested(
 			TypeSerializerSchemaCompatibility<?> outerCompatibility,
 			TypeSerializer<?>... newNestedSerializers) {
@@ -135,7 +148,7 @@ private CompositeSerializerSnapshot(TypeSerializerSnapshot<?>[] snapshots) {
 	/**
 	 * Writes the composite snapshot of all the contained serializers.
 	 */
-	public final void writeCompositeSnapshot(DataOutputView out) throws IOException {
+	public final void writeNestedSerializerSnapshots(DataOutputView out) throws IOException {
 		out.writeInt(MAGIC_NUMBER);
 		out.writeInt(VERSION);
 
@@ -148,7 +161,7 @@ public final void writeCompositeSnapshot(DataOutputView out) throws IOException
 	/**
 	 * Reads the composite snapshot of all the contained serializers.
 	 */
-	public static CompositeSerializerSnapshot readCompositeSnapshot(DataInputView in, ClassLoader cl) throws IOException {
+	public static NestedSerializersSnapshotDelegate readNestedSerializerSnapshots(DataInputView in, ClassLoader cl) throws IOException {
 		final int magicNumber = in.readInt();
 		if (magicNumber != MAGIC_NUMBER) {
 			throw new IOException(String.format("Corrupt data, magic number mismatch. Expected %8x, found %8x",
@@ -167,14 +180,14 @@ public static CompositeSerializerSnapshot readCompositeSnapshot(DataInputView in
 			nestedSnapshots[i] = TypeSerializerSnapshot.readVersionedSnapshot(in, cl);
 		}
 
-		return new CompositeSerializerSnapshot(nestedSnapshots);
+		return new NestedSerializersSnapshotDelegate(nestedSnapshots);
 	}
 
 	/**
 	 * Reads the composite snapshot of all the contained serializers in a way that is compatible
 	 * with Version 1 of the deprecated {@link CompositeTypeSerializerConfigSnapshot}.
 	 */
-	public static CompositeSerializerSnapshot legacyReadProductSnapshots(DataInputView in, ClassLoader cl) throws IOException {
+	public static NestedSerializersSnapshotDelegate legacyReadNestedSerializerSnapshots(DataInputView in, ClassLoader cl) throws IOException {
 		@SuppressWarnings("deprecation")
 		final List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndSnapshots =
 				TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, cl);
@@ -183,7 +196,7 @@ public static CompositeSerializerSnapshot legacyReadProductSnapshots(DataInputVi
 				.map(t -> t.f1)
 				.toArray(TypeSerializerSnapshot<?>[]::new);
 
-		return new CompositeSerializerSnapshot(nestedSnapshots);
+		return new NestedSerializersSnapshotDelegate(nestedSnapshots);
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
index 377dd4cc198..762a4410c85 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
@@ -49,10 +49,13 @@ public CollectionSerializerConfigSnapshot(TypeSerializer<T> elementSerializer) {
 	@Override
 	public TypeSerializerSchemaCompatibility<C> resolveSchemaCompatibility(TypeSerializer<C> newSerializer) {
 		if (newSerializer instanceof ListSerializer) {
-			ListSerializerSnapshot<T> listSerializerSnapshot =
-				new ListSerializerSnapshot<>(((ListSerializer<T>) newSerializer).getElementSerializer());
+			ListSerializer<T> newListSerializer = (ListSerializer<T>) newSerializer;
+			ListSerializerSnapshot<T> listSerializerSnapshot = new ListSerializerSnapshot<>(newListSerializer);
 
-			return listSerializerSnapshot.resolveSchemaCompatibility((ListSerializer) newSerializer);
+			@SuppressWarnings("unchecked")
+			TypeSerializerSchemaCompatibility<C> result = (TypeSerializerSchemaCompatibility<C>)
+				listSerializerSnapshot.resolveSchemaCompatibility(newListSerializer);
+			return result;
 		} else {
 			return super.resolveSchemaCompatibility(newSerializer);
 		}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 55ba8ab477f..a4949fb6fa7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -206,7 +206,7 @@ public String toString() {
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public GenericArraySerializerConfigSnapshot<C> snapshotConfiguration() {
-		return new GenericArraySerializerConfigSnapshot<>(this);
+	public GenericArraySerializerSnapshot<C> snapshotConfiguration() {
+		return new GenericArraySerializerSnapshot<>(this);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index b0aa24128d2..cfc2e9825d7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -38,8 +38,12 @@
  * Point-in-time configuration of a {@link GenericArraySerializer}.
  *
  * @param <C> The component type.
+ *
+ * @deprecated this is deprecated and no longer used by the {@link GenericArraySerializer}.
+ *             It has been replaced by {@link GenericArraySerializerSnapshot}.
  */
 @Internal
+@Deprecated
 public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerializerSnapshot<C[]> {
 
 	private static final int CURRENT_VERSION = 2;
@@ -50,7 +54,7 @@
 
 	/** Snapshot handling for the component serializer snapshot. */
 	@Nullable
-	private CompositeSerializerSnapshot nestedSnapshot;
+	private NestedSerializersSnapshotDelegate nestedSnapshot;
 
 	/**
 	 * Constructor for read instantiation.
@@ -63,7 +67,7 @@ public GenericArraySerializerConfigSnapshot() {}
 	 */
 	public GenericArraySerializerConfigSnapshot(GenericArraySerializer<C> serializer) {
 		this.componentClass = serializer.getComponentClass();
-		this.nestedSnapshot = new CompositeSerializerSnapshot(serializer.getComponentSerializer());
+		this.nestedSnapshot = new NestedSerializersSnapshotDelegate(serializer.getComponentSerializer());
 	}
 
 	// ------------------------------------------------------------------------
@@ -77,7 +81,7 @@ public int getCurrentVersion() {
 	public void writeSnapshot(DataOutputView out) throws IOException {
 		checkState(componentClass != null && nestedSnapshot != null);
 		out.writeUTF(componentClass.getName());
-		nestedSnapshot.writeCompositeSnapshot(out);
+		nestedSnapshot.writeNestedSerializerSnapshots(out);
 	}
 
 	@Override
@@ -95,7 +99,7 @@ public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoa
 	}
 
 	private void readV1(DataInputView in, ClassLoader classLoader) throws IOException {
-		nestedSnapshot = CompositeSerializerSnapshot.legacyReadProductSnapshots(in, classLoader);
+		nestedSnapshot = NestedSerializersSnapshotDelegate.legacyReadNestedSerializerSnapshots(in, classLoader);
 
 		try (DataInputViewStream inViewWrapper = new DataInputViewStream(in)) {
 			componentClass = InstantiationUtil.deserializeObject(inViewWrapper, classLoader);
@@ -107,29 +111,23 @@ private void readV1(DataInputView in, ClassLoader classLoader) throws IOExceptio
 
 	private void readV2(DataInputView in, ClassLoader classLoader) throws IOException {
 		componentClass = InstantiationUtil.resolveClassByName(in, classLoader);
-		nestedSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, classLoader);
+		nestedSnapshot = NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, classLoader);
 	}
 
 	@Override
 	public TypeSerializer<C[]> restoreSerializer() {
 		checkState(componentClass != null && nestedSnapshot != null);
-		return new GenericArraySerializer<>(componentClass, nestedSnapshot.getRestoreSerializer(0));
+		return new GenericArraySerializer<>(componentClass, nestedSnapshot.getRestoredNestedSerializer(0));
 	}
 
 	@Override
 	public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(TypeSerializer<C[]> newSerializer) {
-		checkState(componentClass != null && nestedSnapshot != null);
-
 		if (newSerializer instanceof GenericArraySerializer) {
-			GenericArraySerializer<C> serializer = (GenericArraySerializer<C>) newSerializer;
-			TypeSerializerSchemaCompatibility<C> compat = serializer.getComponentClass() == componentClass ?
-					TypeSerializerSchemaCompatibility.compatibleAsIs() :
-					TypeSerializerSchemaCompatibility.incompatible();
-
-			return nestedSnapshot.resolveCompatibilityWithNested(
-					compat, serializer.getComponentSerializer());
-		}
-		else {
+			// delegate to the new snapshot class
+			GenericArraySerializer<C> castedNewSerializer = (GenericArraySerializer<C>) newSerializer;
+			GenericArraySerializerSnapshot<C> newSnapshot = new GenericArraySerializerSnapshot<>(castedNewSerializer);
+			return newSnapshot.resolveSchemaCompatibility(castedNewSerializer);
+		} else {
 			return TypeSerializerSchemaCompatibility.incompatible();
 		}
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
new file mode 100644
index 00000000000..3f54dee67cb
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * Point-in-time configuration of a {@link GenericArraySerializer}.
+ *
+ * @param <C> The component type.
+ */
+public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {
+
+	private static final int CURRENT_VERSION = 1;
+
+	private Class<C> componentClass;
+
+	/**
+	 * Constructor to be used for read instantiation.
+	 */
+	public GenericArraySerializerSnapshot() {
+		super(GenericArraySerializer.class);
+	}
+
+	/**
+	 * Constructor to be used for writing the snapshot.
+	 */
+	public GenericArraySerializerSnapshot(GenericArraySerializer<C> genericArraySerializer) {
+		super(genericArraySerializer);
+		this.componentClass = genericArraySerializer.getComponentClass();
+	}
+
+	@Override
+	protected int getCurrentOuterSnapshotVersion() {
+		return CURRENT_VERSION;
+	}
+
+	@Override
+	protected void writeOuterSnapshot(DataOutputView out) throws IOException {
+		out.writeUTF(componentClass.getName());
+	}
+
+	@Override
+	protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader);
+	}
+
+	@Override
+	protected GenericArraySerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<C> componentSerializer = (TypeSerializer<C>) nestedSerializers[0];
+		return new GenericArraySerializer<>(componentClass, componentSerializer);
+	}
+
+	@Override
+	protected TypeSerializer<?>[] getNestedSerializers(GenericArraySerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getComponentSerializer() };
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index 08b33334653..51780319036 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -175,6 +175,6 @@ public int hashCode() {
 
 	@Override
 	public TypeSerializerSnapshot<List<T>> snapshotConfiguration() {
-		return new ListSerializerSnapshot<>(elementSerializer);
+		return new ListSerializerSnapshot<>(this);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
index 5f89d94973f..f90e22a651a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
@@ -18,73 +18,45 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
 import java.util.List;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Snapshot class for the {@link ListSerializer}.
  */
-public class ListSerializerSnapshot<T> implements TypeSerializerSnapshot<List<T>> {
+public class ListSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<List<T>, ListSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public ListSerializerSnapshot() {}
+	public ListSerializerSnapshot() {
+		super(ListSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public ListSerializerSnapshot(TypeSerializer<T> elementSerializer) {
-		this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(elementSerializer));
+	public ListSerializerSnapshot(ListSerializer<T> listSerializer) {
+		super(listSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<List<T>> restoreSerializer() {
-		return new ListSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<List<T>> resolveSchemaCompatibility(TypeSerializer<List<T>> newSerializer) {
-		checkState(nestedElementSerializerSnapshot != null);
-
-		if (newSerializer instanceof ListSerializer) {
-			ListSerializer<T> serializer = (ListSerializer<T>) newSerializer;
-
-			return nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getElementSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
+	protected ListSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<T> elementSerializer = (TypeSerializer<T>) nestedSerializers[0];
+		return new ListSerializer<>(elementSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(ListSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() };
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index dd3b81bade7..bedaf693608 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -202,6 +202,6 @@ public int hashCode() {
 
 	@Override
 	public TypeSerializerSnapshot<Map<K, V>> snapshotConfiguration() {
-		return new MapSerializerSnapshot<>(keySerializer, valueSerializer);
+		return new MapSerializerSnapshot<>(this);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
index 000924f0245..2b78b527f7e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
@@ -50,8 +50,7 @@ public MapSerializerConfigSnapshot(TypeSerializer<K> keySerializer, TypeSerializ
 			// redirect the compatibility check to the new MapSerializerConfigSnapshot
 			MapSerializer<K, V> mapSerializer = (MapSerializer<K, V>) newSerializer;
 
-			MapSerializerSnapshot<K, V> mapSerializerSnapshot =
-				new MapSerializerSnapshot<>(mapSerializer.getKeySerializer(), mapSerializer.getValueSerializer());
+			MapSerializerSnapshot<K, V> mapSerializerSnapshot = new MapSerializerSnapshot<>(mapSerializer);
 			return mapSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
 		}
 		else {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
index be2e4b0cbe6..a6db0ef74e6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
@@ -18,78 +18,50 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Snapshot class for the {@link MapSerializer}.
  */
-public class MapSerializerSnapshot<K, V> implements TypeSerializerSnapshot<Map<K, V>> {
+public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedKeyValueSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public MapSerializerSnapshot() {}
+	public MapSerializerSnapshot() {
+		super(MapSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public MapSerializerSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
-		Preconditions.checkNotNull(keySerializer);
-		Preconditions.checkNotNull(valueSerializer);
-		this.nestedKeyValueSerializerSnapshot = new CompositeSerializerSnapshot(keySerializer, valueSerializer);
+	public MapSerializerSnapshot(MapSerializer<K, V> mapSerializer) {
+		super(mapSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<Map<K, V>> restoreSerializer() {
-		return new MapSerializer<>(
-			nestedKeyValueSerializerSnapshot.getRestoreSerializer(0),
-			nestedKeyValueSerializerSnapshot.getRestoreSerializer(1));
-	}
+	protected MapSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0];
 
-	@Override
-	public TypeSerializerSchemaCompatibility<Map<K, V>> resolveSchemaCompatibility(TypeSerializer<Map<K, V>> newSerializer) {
-		checkState(nestedKeyValueSerializerSnapshot != null);
+		@SuppressWarnings("unchecked")
+		TypeSerializer<V> valueSerializer = (TypeSerializer<V>) nestedSerializers[1];
 
-		if (newSerializer instanceof MapSerializer) {
-			MapSerializer<K, V> serializer = (MapSerializer<K, V>) newSerializer;
-
-			return nestedKeyValueSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getKeySerializer(),
-				serializer.getValueSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedKeyValueSerializerSnapshot.writeCompositeSnapshot(out);
+		return new MapSerializer<>(keySerializer, valueSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedKeyValueSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(MapSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() };
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index 3d4e8e92762..01286400694 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -205,7 +205,7 @@ public int hashCode() {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public EitherSerializerSnapshot<L, R> snapshotConfiguration() {
-		return new EitherSerializerSnapshot<>(leftSerializer, rightSerializer);
+	public JavaEitherSerializerSnapshot<L, R> snapshotConfiguration() {
+		return new JavaEitherSerializerSnapshot<>(this);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
index 016fd0430f7..1779ec8e744 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -35,15 +35,19 @@
 
 /**
  * Configuration snapshot for the {@link EitherSerializer}.
+ *
+ * @deprecated this snapshot class is no longer used by any serializers.
+ *             Instead, {@link JavaEitherSerializerSnapshot} is used.
  */
 @Internal
+@Deprecated
 public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnapshot<Either<L, R>> {
 
 	private static final int CURRENT_VERSION = 2;
 
 	/** Snapshot handling for the component serializer snapshot. */
 	@Nullable
-	private CompositeSerializerSnapshot nestedSnapshot;
+	private NestedSerializersSnapshotDelegate nestedSnapshot;
 
 	/**
 	 * Constructor for read instantiation.
@@ -58,7 +62,7 @@ public EitherSerializerSnapshot(
 			TypeSerializer<L> leftSerializer,
 			TypeSerializer<R> rightSerializer) {
 
-		this.nestedSnapshot = new CompositeSerializerSnapshot(leftSerializer, rightSerializer);
+		this.nestedSnapshot = new NestedSerializersSnapshotDelegate(leftSerializer, rightSerializer);
 	}
 
 	// ------------------------------------------------------------------------
@@ -71,7 +75,7 @@ public int getCurrentVersion() {
 	@Override
 	public void writeSnapshot(DataOutputView out) throws IOException {
 		checkState(nestedSnapshot != null);
-		nestedSnapshot.writeCompositeSnapshot(out);
+		nestedSnapshot.writeNestedSerializerSnapshots(out);
 	}
 
 	@Override
@@ -89,19 +93,19 @@ public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoa
 	}
 
 	private void readV1(DataInputView in, ClassLoader classLoader) throws IOException {
-		nestedSnapshot = CompositeSerializerSnapshot.legacyReadProductSnapshots(in, classLoader);
+		nestedSnapshot = NestedSerializersSnapshotDelegate.legacyReadNestedSerializerSnapshots(in, classLoader);
 	}
 
 	private void readV2(DataInputView in, ClassLoader classLoader) throws IOException {
-		nestedSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, classLoader);
+		nestedSnapshot = NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, classLoader);
 	}
 
 	@Override
 	public TypeSerializer<Either<L, R>> restoreSerializer() {
 		checkState(nestedSnapshot != null);
 		return new EitherSerializer<>(
-				nestedSnapshot.getRestoreSerializer(0),
-				nestedSnapshot.getRestoreSerializer(1));
+				nestedSnapshot.getRestoredNestedSerializer(0),
+				nestedSnapshot.getRestoredNestedSerializer(1));
 	}
 
 	@Override
@@ -110,12 +114,10 @@ private void readV2(DataInputView in, ClassLoader classLoader) throws IOExceptio
 		checkState(nestedSnapshot != null);
 
 		if (newSerializer instanceof EitherSerializer) {
+			// delegate compatibility check to the new snapshot class
 			EitherSerializer<L, R> serializer = (EitherSerializer<L, R>) newSerializer;
-
-			return nestedSnapshot.resolveCompatibilityWithNested(
-					TypeSerializerSchemaCompatibility.compatibleAsIs(),
-					serializer.getLeftSerializer(),
-					serializer.getRightSerializer());
+			JavaEitherSerializerSnapshot<L, R> newSnapshot = new JavaEitherSerializerSnapshot<>(serializer);
+			return newSnapshot.resolveSchemaCompatibility(serializer);
 		}
 		else {
 			return TypeSerializerSchemaCompatibility.incompatible();
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
new file mode 100644
index 00000000000..503634599f8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.Either;
+
+/**
+ * Snapshot class for the {@link EitherSerializer}.
+ */
+public class JavaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer> {
+
+	private static final int CURRENT_VERSION = 1;
+
+	/**
+	 * Constructor for read instantiation.
+	 */
+	@SuppressWarnings("unused")
+	public JavaEitherSerializerSnapshot() {
+		super(EitherSerializer.class);
+	}
+
+	/**
+	 * Constructor to create the snapshot for writing.
+	 */
+	public JavaEitherSerializerSnapshot(EitherSerializer<L, R> eitherSerializer) {
+		super(eitherSerializer);
+	}
+
+	@Override
+	protected int getCurrentOuterSnapshotVersion() {
+		return CURRENT_VERSION;
+	}
+
+	@Override
+	protected EitherSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		return new EitherSerializer<>(nestedSerializers[0], nestedSerializers[1]);
+	}
+
+	@Override
+	protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer outerSerializer) {
+		return new TypeSerializer<?>[]{ outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer() };
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
index c7b002a21ec..62135d7e550 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
-import org.apache.flink.api.common.typeutils.base.GenericArraySerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.GenericArraySerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
-import org.apache.flink.api.java.typeutils.runtime.EitherSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.JavaEitherSerializerSnapshot;
 import org.apache.flink.types.Either;
 
 import org.junit.runner.RunWith;
@@ -48,14 +48,14 @@ public CompositeTypeSerializerSnapshotMigrationTest(TestSpecification<Object> te
 
 		// Either<String, Integer>
 
-		final TestSpecification<Either<String, Integer>> either = TestSpecification.<Either<String, Integer>>builder("1.6-either", EitherSerializer.class, EitherSerializerSnapshot.class)
+		final TestSpecification<Either<String, Integer>> either = TestSpecification.<Either<String, Integer>>builder("1.6-either", EitherSerializer.class, JavaEitherSerializerSnapshot.class)
 			.withSerializerProvider(() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE))
 			.withSnapshotDataLocation("flink-1.6-either-type-serializer-snapshot")
 			.withTestData("flink-1.6-either-type-serializer-data", 10);
 
 		// GenericArray<String>
 
-		final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerConfigSnapshot.class)
+		final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerSnapshot.class)
 			.withSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
 			.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
 			.withTestData("flink-1.6-array-type-serializer-data", 10);
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
new file mode 100644
index 00000000000..0f77e3d7376
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Test suite for the {@link CompositeTypeSerializerSnapshot}.
+ */
+public class CompositeTypeSerializerSnapshotTest {
+
+	// ------------------------------------------------------------------------------------------------
+	//  Scope: tests CompositeTypeSerializerSnapshot#resolveSchemaCompatibility
+	// ------------------------------------------------------------------------------------------------
+
+	@Test
+	public void testIncompatiblePrecedence() throws IOException {
+		final String OUTER_CONFIG = "outer-config";
+		final TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION),
+			new NestedSerializer(TargetCompatibility.INCOMPATIBLE),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER)
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				OUTER_CONFIG,
+				OUTER_CONFIG);
+
+		Assert.assertTrue(compatibility.isIncompatible());
+	}
+
+	@Test
+	public void testCompatibleAfterMigrationPrecedence() throws IOException {
+		final String OUTER_CONFIG = "outer-config";
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				OUTER_CONFIG,
+				OUTER_CONFIG);
+
+		Assert.assertTrue(compatibility.isCompatibleAfterMigration());
+	}
+
+	@Test
+	public void testCompatibleWithReconfiguredSerializerPrecedence() throws IOException {
+		final String OUTER_CONFIG = "outer-config";
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				OUTER_CONFIG,
+				OUTER_CONFIG);
+
+		Assert.assertTrue(compatibility.isCompatibleWithReconfiguredSerializer());
+
+		TestCompositeTypeSerializer reconfiguredSerializer =
+			(TestCompositeTypeSerializer) compatibility.getReconfiguredSerializer();
+		TypeSerializer<?>[] reconfiguredNestedSerializers = reconfiguredSerializer.getNestedSerializers();
+		// nested serializer at index 1 should strictly be a ReconfiguredNestedSerializer
+		Assert.assertTrue(reconfiguredNestedSerializers[0].getClass() == NestedSerializer.class);
+		Assert.assertTrue(reconfiguredNestedSerializers[1].getClass() == ReconfiguredNestedSerializer.class);
+		Assert.assertTrue(reconfiguredNestedSerializers[2].getClass() == NestedSerializer.class);
+	}
+
+	@Test
+	public void testCompatibleAsIsPrecedence() throws IOException {
+		final String OUTER_CONFIG = "outer-config";
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				OUTER_CONFIG,
+				OUTER_CONFIG);
+
+		Assert.assertTrue(compatibility.isCompatibleAsIs());
+	}
+
+	@Test
+	public void testOuterSnapshotCompatibilityPrecedence() throws IOException {
+		final String INIT_OUTER_CONFIG = "outer-config";
+		final String INCOMPAT_OUTER_CONFIG = "incompat-outer-config";
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				INIT_OUTER_CONFIG,
+				INCOMPAT_OUTER_CONFIG);
+
+		// even though nested serializers are compatible, incompatibility of the outer
+		// snapshot should have higher precedence in the final result
+		Assert.assertTrue(compatibility.isIncompatible());
+	}
+
+	private TypeSerializerSchemaCompatibility<String> snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+			TypeSerializer<?>[] testNestedSerializers,
+			String initialOuterConfiguration,
+			String newOuterConfiguration) throws IOException {
+		TestCompositeTypeSerializer testSerializer =
+			new TestCompositeTypeSerializer(initialOuterConfiguration, testNestedSerializers);
+
+		TypeSerializerSnapshot<String> testSerializerSnapshot = testSerializer.snapshotConfiguration();
+
+		DataOutputSerializer out = new DataOutputSerializer(128);
+		TypeSerializerSnapshot.writeVersionedSnapshot(out, testSerializerSnapshot);
+
+		DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+		testSerializerSnapshot = TypeSerializerSnapshot.readVersionedSnapshot(
+			in, Thread.currentThread().getContextClassLoader());
+
+		TestCompositeTypeSerializer newTestSerializer =
+			new TestCompositeTypeSerializer(newOuterConfiguration, testNestedSerializers);
+		return testSerializerSnapshot.resolveSchemaCompatibility(newTestSerializer);
+	}
+
+	// ------------------------------------------------------------------------------------------------
+	//  Scope: tests CompositeTypeSerializerSnapshot#restoreSerializer
+	// ------------------------------------------------------------------------------------------------
+
+	@Test
+	public void testRestoreCompositeTypeSerializer() throws IOException {
+		// the target compatibilities of the nested serializers doesn't matter,
+		// because we're only testing the restore serializer
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.INCOMPATIBLE),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION)
+		};
+
+		TestCompositeTypeSerializer testSerializer = new TestCompositeTypeSerializer("outer-config", testNestedSerializers);
+
+		TypeSerializerSnapshot<String> testSerializerSnapshot = testSerializer.snapshotConfiguration();
+
+		DataOutputSerializer out = new DataOutputSerializer(128);
+		TypeSerializerSnapshot.writeVersionedSnapshot(out, testSerializerSnapshot);
+
+		DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+		testSerializerSnapshot = TypeSerializerSnapshot.readVersionedSnapshot(
+			in, Thread.currentThread().getContextClassLoader());
+
+		// now, restore the composite type serializer;
+		// the restored nested serializer should be a RestoredNestedSerializer
+		testSerializer = (TestCompositeTypeSerializer) testSerializerSnapshot.restoreSerializer();
+		Assert.assertTrue(testSerializer.getNestedSerializers()[0].getClass() == RestoredNestedSerializer.class);
+		Assert.assertTrue(testSerializer.getNestedSerializers()[1].getClass() == RestoredNestedSerializer.class);
+		Assert.assertTrue(testSerializer.getNestedSerializers()[2].getClass() == RestoredNestedSerializer.class);
+	}
+
+	// ------------------------------------------------------------------------------------------------
+	//  Test utilities
+	// ------------------------------------------------------------------------------------------------
+
+	/**
+	 * A simple composite serializer used for testing.
+	 * It can be configured with an array of nested serializers, as well as outer configuration (represented as String).
+	 */
+	public static class TestCompositeTypeSerializer extends TypeSerializer<String> {
+
+		private static final long serialVersionUID = -545688468997398105L;
+
+		private static final StringSerializer delegateSerializer = StringSerializer.INSTANCE;
+
+		private final String outerConfiguration;
+
+		private final TypeSerializer<?>[] nestedSerializers;
+
+		TestCompositeTypeSerializer(
+				String outerConfiguration,
+				TypeSerializer<?>[] nestedSerializers) {
+			this.outerConfiguration = outerConfiguration;
+			this.nestedSerializers = nestedSerializers;
+		}
+
+		public String getOuterConfiguration() {
+			return outerConfiguration;
+		}
+
+		TypeSerializer<?>[] getNestedSerializers() {
+			return nestedSerializers;
+		}
+
+		@Override
+		public TypeSerializerSnapshot<String> snapshotConfiguration() {
+			return new TestCompositeTypeSerializerSnapshot(this);
+		}
+
+		// --------------------------------------------------------------------------------
+		//  Serialization delegation
+		// --------------------------------------------------------------------------------
+
+		@Override
+		public String deserialize(String reuse, DataInputView source) throws IOException {
+			return delegateSerializer.deserialize(reuse, source);
+		}
+
+		@Override
+		public String deserialize(DataInputView source) throws IOException {
+			return delegateSerializer.deserialize(source);
+		}
+
+		@Override
+		public void serialize(String record, DataOutputView target) throws IOException {
+			delegateSerializer.serialize(record, target);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			delegateSerializer.copy(source, target);
+		}
+
+		@Override
+		public String copy(String from) {
+			return delegateSerializer.copy(from);
+		}
+
+		@Override
+		public String copy(String from, String reuse) {
+			return delegateSerializer.copy(from, reuse);
+		}
+
+		@Override
+		public String createInstance() {
+			return delegateSerializer.createInstance();
+		}
+
+		@Override
+		public TypeSerializer<String> duplicate() {
+			return this;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (canEqual(obj)) {
+				return Arrays.equals(nestedSerializers, ((TestCompositeTypeSerializer) obj).getNestedSerializers());
+			}
+			return false;
+		}
+
+		@Override
+		public int hashCode() {
+			return Arrays.hashCode(nestedSerializers);
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof TestCompositeTypeSerializer;
+		}
+	}
+
+	/**
+	 * Snapshot class for the {@link TestCompositeTypeSerializer}.
+	 */
+	public static class TestCompositeTypeSerializerSnapshot extends CompositeTypeSerializerSnapshot<String, TestCompositeTypeSerializer> {
+
+		private String outerConfiguration;
+
+		public TestCompositeTypeSerializerSnapshot() {
+			super(TestCompositeTypeSerializer.class);
+		}
+
+		TestCompositeTypeSerializerSnapshot(TestCompositeTypeSerializer serializer) {
+			super(serializer);
+			this.outerConfiguration = serializer.getOuterConfiguration();
+		}
+
+		@Override
+		protected TestCompositeTypeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			return new TestCompositeTypeSerializer(outerConfiguration, nestedSerializers);
+		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(TestCompositeTypeSerializer outerSerializer) {
+			return outerSerializer.getNestedSerializers();
+		}
+
+		@Override
+		protected void writeOuterSnapshot(DataOutputView out) throws IOException {
+			out.writeUTF(outerConfiguration);
+		}
+
+		@Override
+		public void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+			Assert.assertEquals(getCurrentOuterSnapshotVersion(), readOuterSnapshotVersion);
+			this.outerConfiguration = in.readUTF();
+		}
+
+		@Override
+		protected boolean isOuterSnapshotCompatible(TestCompositeTypeSerializer newSerializer) {
+			return outerConfiguration.equals(newSerializer.getOuterConfiguration());
+		}
+
+		@Override
+		public int getCurrentOuterSnapshotVersion() {
+			return 1;
+		}
+	}
+
+	public enum TargetCompatibility {
+		COMPATIBLE_AS_IS,
+		COMPATIBLE_AFTER_MIGRATION,
+		COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
+		INCOMPATIBLE
+	}
+
+	/**
+	 * Used as nested serializers in the test composite serializer.
+	 * A nested serializer can be configured with a {@link TargetCompatibility},
+	 * which indicates what the result of the schema compatibility check should be
+	 * when a new instance of it is being checked for compatibility.
+	 */
+	public static class NestedSerializer extends TypeSerializer<String> {
+
+		private static final long serialVersionUID = -6175000932620623446L;
+
+		private static final StringSerializer delegateSerializer = StringSerializer.INSTANCE;
+
+		private final TargetCompatibility targetCompatibility;
+
+		NestedSerializer(TargetCompatibility targetCompatibility) {
+			this.targetCompatibility = targetCompatibility;
+		}
+
+		@Override
+		public TypeSerializerSnapshot<String> snapshotConfiguration() {
+			return new NestedSerializerSnapshot(targetCompatibility);
+		}
+
+		// --------------------------------------------------------------------------------
+		//  Serialization delegation
+		// --------------------------------------------------------------------------------
+
+		@Override
+		public String deserialize(String reuse, DataInputView source) throws IOException {
+			return delegateSerializer.deserialize(reuse, source);
+		}
+
+		@Override
+		public String deserialize(DataInputView source) throws IOException {
+			return delegateSerializer.deserialize(source);
+		}
+
+		@Override
+		public void serialize(String record, DataOutputView target) throws IOException {
+			delegateSerializer.serialize(record, target);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			delegateSerializer.copy(source, target);
+		}
+
+		@Override
+		public String copy(String from) {
+			return delegateSerializer.copy(from);
+		}
+
+		@Override
+		public String copy(String from, String reuse) {
+			return delegateSerializer.copy(from, reuse);
+		}
+
+		@Override
+		public String createInstance() {
+			return delegateSerializer.createInstance();
+		}
+
+		@Override
+		public TypeSerializer<String> duplicate() {
+			return this;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (canEqual(obj)) {
+				return targetCompatibility == ((NestedSerializer) obj).targetCompatibility;
+			}
+			return false;
+		}
+
+		@Override
+		public int hashCode() {
+			return targetCompatibility.hashCode();
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof NestedSerializer;
+		}
+	}
+
+	/**
+	 * Snapshot of the {@link NestedSerializer}.
+	 */
+	public static class NestedSerializerSnapshot implements TypeSerializerSnapshot<String> {
+
+		private TargetCompatibility targetCompatibility;
+
+		public NestedSerializerSnapshot() {}
+
+		public NestedSerializerSnapshot(TargetCompatibility targetCompatibility) {
+			this.targetCompatibility = targetCompatibility;
+		}
+
+		@Override
+		public void writeSnapshot(DataOutputView out) throws IOException {
+			out.writeInt(targetCompatibility.ordinal());
+		}
+
+		@Override
+		public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+			this.targetCompatibility = TargetCompatibility.values()[in.readInt()];
+		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<String> resolveSchemaCompatibility(TypeSerializer<String> newSerializer) {
+			// checks the exact class instead of using instanceof;
+			// this ensures that we get a new serializer, and not a ReconfiguredNestedSerializer or RestoredNestedSerializer
+			if (newSerializer.getClass() == NestedSerializer.class) {
+				switch (targetCompatibility) {
+					case COMPATIBLE_AS_IS:
+						return TypeSerializerSchemaCompatibility.compatibleAsIs();
+					case COMPATIBLE_AFTER_MIGRATION:
+						return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+					case COMPATIBLE_WITH_RECONFIGURED_SERIALIZER:
+						return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
+							new ReconfiguredNestedSerializer(targetCompatibility));
+					case INCOMPATIBLE:
+						return TypeSerializerSchemaCompatibility.incompatible();
+					default:
+						throw new IllegalStateException("Unexpected target compatibility.");
+				}
+			}
+
+			throw new IllegalArgumentException("Expected the new serializer to be of class " + NestedSerializer.class);
+		}
+
+		@Override
+		public TypeSerializer<String> restoreSerializer() {
+			return new RestoredNestedSerializer(targetCompatibility);
+		}
+
+		@Override
+		public int getCurrentVersion() {
+			return 1;
+		}
+	}
+
+	/**
+	 * A variant of the {@link NestedSerializer} used only when creating a reconfigured instance
+	 * of the serializer. This is used in tests as a tag to identify that the correct serializer
+	 * instances are being used.
+	 */
+	static class ReconfiguredNestedSerializer extends NestedSerializer {
+
+		private static final long serialVersionUID = -1396401178636869659L;
+
+		public ReconfiguredNestedSerializer(TargetCompatibility targetCompatibility) {
+			super(targetCompatibility);
+		}
+
+	}
+
+	/**
+	 * A variant of the {@link NestedSerializer} used only when creating a restored instance
+	 * of the serializer. This is used in tests as a tag to identify that the correct serializer
+	 * instances are being used.
+	 */
+	static class RestoredNestedSerializer extends NestedSerializer {
+
+		private static final long serialVersionUID = -1396401178636869659L;
+
+		public RestoredNestedSerializer(TargetCompatibility targetCompatibility) {
+			super(targetCompatibility);
+		}
+
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
index 186f50482af..ea18309ac98 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
@@ -103,6 +103,8 @@ public void movingForward() throws IOException {
 		TypeSerializer<ElementT> restoredSerializer = previousSnapshot.restoreSerializer();
 
 		TypeSerializerSnapshot<ElementT> nextSnapshot = restoredSerializer.snapshotConfiguration();
+		assertThat(nextSnapshot, instanceOf(testSpecification.snapshotClass));
+
 		TypeSerializerSnapshot<ElementT> nextSnapshotDeserialized = writeAndThenReadTheSnapshot(restoredSerializer, nextSnapshot);
 
 		assertThat(nextSnapshotDeserialized, allOf(
@@ -247,6 +249,10 @@ private Path getSnapshotDataLocation() {
 			return resourcePath(this.snapshotDataLocation);
 		}
 
+		public Class<? extends TypeSerializerSnapshot<T>> getSnapshotClass() {
+			return snapshotClass;
+		}
+
 		@Override
 		public String toString() {
 			return String.format("%s , %s, %s", name, serializerType.getSimpleName(), snapshotClass.getSimpleName());
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
index 7afbc508230..ae1452bf672 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
@@ -187,7 +187,7 @@ public boolean canEqual(Object obj) {
 
 		@Override
 		public TypeSerializerSnapshot<Lockable<E>> snapshotConfiguration() {
-			return new LockableTypeSerializerSnapshot<>(elementSerializer);
+			return new LockableTypeSerializerSnapshot<>(this);
 		}
 
 		/**
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
index 44a4670cc1d..13867aca0c3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
@@ -19,74 +19,46 @@
 package org.apache.flink.cep.nfa.sharedbuffer;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A {@link TypeSerializerSnapshot} for the {@link Lockable.LockableTypeSerializer}.
  */
 @Internal
-public class LockableTypeSerializerSnapshot<E> implements TypeSerializerSnapshot<Lockable<E>> {
+public class LockableTypeSerializerSnapshot<E> extends CompositeTypeSerializerSnapshot<Lockable<E>, Lockable.LockableTypeSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public LockableTypeSerializerSnapshot() {}
+	public LockableTypeSerializerSnapshot() {
+		super(Lockable.LockableTypeSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public LockableTypeSerializerSnapshot(TypeSerializer<E> elementSerializer) {
-		this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(elementSerializer));
+	public LockableTypeSerializerSnapshot(Lockable.LockableTypeSerializer<E> lockableTypeSerializer) {
+		super(lockableTypeSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<Lockable<E>> restoreSerializer() {
-		return new Lockable.LockableTypeSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
+	protected Lockable.LockableTypeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<E> elementSerializer = (TypeSerializer<E>) nestedSerializers[0];
+		return new Lockable.LockableTypeSerializer<>(elementSerializer);
 	}
 
 	@Override
-	public TypeSerializerSchemaCompatibility<Lockable<E>> resolveSchemaCompatibility(TypeSerializer<Lockable<E>> newSerializer) {
-		checkState(nestedElementSerializerSnapshot != null);
-
-		if (newSerializer instanceof Lockable.LockableTypeSerializer) {
-			Lockable.LockableTypeSerializer<E> serializer = (Lockable.LockableTypeSerializer<E>) newSerializer;
-
-			return nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getElementSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
+	protected TypeSerializer<?>[] getNestedSerializers(Lockable.LockableTypeSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() };
 	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
-	}
-
-	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
-	}
-
 }
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
index cca84d28d17..90468acf8c3 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
@@ -18,76 +18,50 @@
 
 package org.apache.flink.table.dataview;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.table.api.dataview.ListView;
-import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.List;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * A {@link TypeSerializerSnapshot} for the {@link ListViewSerializer}.
  *
  * @param <T> the type of the list elements.
  */
-public final class ListViewSerializerSnapshot<T> implements TypeSerializerSnapshot<ListView<T>> {
+public final class ListViewSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ListView<T>, ListViewSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedListSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public ListViewSerializerSnapshot() {}
+	public ListViewSerializerSnapshot() {
+		super(ListViewSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public ListViewSerializerSnapshot(TypeSerializer<List<T>> listSerializer) {
-		this.nestedListSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(listSerializer));
+	public ListViewSerializerSnapshot(ListViewSerializer<T> listViewSerializer) {
+		super(listViewSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<ListView<T>> restoreSerializer() {
-		return new ListViewSerializer<>(nestedListSerializerSnapshot.getRestoreSerializer(0));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<ListView<T>> resolveSchemaCompatibility(TypeSerializer<ListView<T>> newSerializer) {
-		checkState(nestedListSerializerSnapshot != null);
-
-		if (newSerializer instanceof ListViewSerializer) {
-			ListViewSerializer<T> serializer = (ListViewSerializer<T>) newSerializer;
-
-			return nestedListSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getListSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedListSerializerSnapshot.writeCompositeSnapshot(out);
+	protected ListViewSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<List<T>> listSerializer = (TypeSerializer<List<T>>) nestedSerializers[0];
+		return new ListViewSerializer<>(listSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedListSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(ListViewSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getListSerializer() };
 	}
 }
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
index f59fc0a3654..132f42f3320 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
@@ -18,78 +18,51 @@
 
 package org.apache.flink.table.dataview;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.table.api.dataview.MapView;
-import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * A {@link TypeSerializerSnapshot} for the {@link MapViewSerializer}.
  *
  * @param <K> the key type of the map entries.
  * @param <V> the value type of the map entries.
  */
-public class MapViewSerializerSnapshot<K, V> implements TypeSerializerSnapshot<MapView<K, V>> {
+public class MapViewSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<MapView<K, V>, MapViewSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedMapSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public MapViewSerializerSnapshot() {}
+	public MapViewSerializerSnapshot() {
+		super(MapViewSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public MapViewSerializerSnapshot(TypeSerializer<Map<K, V>> mapSerializer) {
-		this.nestedMapSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(mapSerializer));
+	public MapViewSerializerSnapshot(MapViewSerializer<K, V> mapViewSerializer) {
+		super(mapViewSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<MapView<K, V>> restoreSerializer() {
-		return new MapViewSerializer<>(nestedMapSerializerSnapshot.getRestoreSerializer(0));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<MapView<K, V>> resolveSchemaCompatibility(
-			TypeSerializer<MapView<K, V>> newSerializer) {
-		checkState(nestedMapSerializerSnapshot != null);
-
-		if (newSerializer instanceof MapViewSerializer) {
-			MapViewSerializer<K, V> serializer = (MapViewSerializer<K, V>) newSerializer;
-
-			return nestedMapSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getMapSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedMapSerializerSnapshot.writeCompositeSnapshot(out);
+	protected MapViewSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<Map<K, V>> mapSerializer = (TypeSerializer<Map<K, V>>) nestedSerializers[0];
+		return new MapViewSerializer<>(mapSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedMapSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(MapViewSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getMapSerializer() };
 	}
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
index 246af6c0dab..2d48c3daf16 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
@@ -77,7 +77,7 @@ class ListViewSerializer[T](val listSerializer: TypeSerializer[java.util.List[T]
     listSerializer.equals(obj.asInstanceOf[ListViewSerializer[_]].listSerializer)
 
   override def snapshotConfiguration(): ListViewSerializerSnapshot[T] =
-    new ListViewSerializerSnapshot[T](listSerializer)
+    new ListViewSerializerSnapshot[T](this)
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[ListView[T]] = {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
index 89cdf701749..e0067c57b10 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
@@ -78,7 +78,7 @@ class MapViewSerializer[K, V](val mapSerializer: TypeSerializer[java.util.Map[K,
     mapSerializer.equals(obj.asInstanceOf[MapViewSerializer[_, _]].mapSerializer)
 
   override def snapshotConfiguration(): MapViewSerializerSnapshot[K, V] =
-    new MapViewSerializerSnapshot[K, V](mapSerializer)
+    new MapViewSerializerSnapshot[K, V](this)
 
   // copy and modified from MapSerializer.ensureCompatibility
   override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot[_])
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 6fa9f02d419..d442d0d3ec7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -147,7 +147,7 @@ public int hashCode() {
 
 	@Override
 	public TypeSerializerSnapshot<ArrayList<T>> snapshotConfiguration() {
-		return new ArrayListSerializerSnapshot<>(elementSerializer);
+		return new ArrayListSerializerSnapshot<>(this);
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
index 7fc8c51c8cf..dde8d1a1e60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
@@ -18,72 +18,46 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 
-import java.io.IOException;
 import java.util.ArrayList;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Snapshot class for the {@link ArrayListSerializer}.
  */
-public class ArrayListSerializerSnapshot<T> implements TypeSerializerSnapshot<ArrayList<T>> {
+public class ArrayListSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ArrayList<T>, ArrayListSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public ArrayListSerializerSnapshot() {}
+	public ArrayListSerializerSnapshot() {
+		super(ArrayListSerializer.class);
+	}
 
 	/**
 	 * Constructor for creating the snapshot for writing.
 	 */
-	public ArrayListSerializerSnapshot(TypeSerializer<T> elementSerializer) {
-		this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(elementSerializer);
+	public ArrayListSerializerSnapshot(ArrayListSerializer<T> arrayListSerializer) {
+		super(arrayListSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<ArrayList<T>> restoreSerializer() {
-		return new ArrayListSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<ArrayList<T>> resolveSchemaCompatibility(TypeSerializer<ArrayList<T>> newSerializer) {
-		checkState(nestedElementSerializerSnapshot != null);
-
-		if (newSerializer instanceof ArrayListSerializer) {
-			ArrayListSerializer<T> serializer = (ArrayListSerializer<T>) newSerializer;
-
-			return nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getElementSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
+	protected ArrayListSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<T> elementSerializer = (TypeSerializer<T>) nestedSerializers[0];
+		return new ArrayListSerializer<>(elementSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(ArrayListSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() };
 	}
 }
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
index b67e47b4eff..26cfef5a1a8 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
@@ -18,81 +18,51 @@
 
 package org.apache.flink.api.scala.typeutils;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
 
 import scala.util.Either;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Configuration snapshot for serializers of Scala's {@link Either} type,
  * containing configuration snapshots of the Left and Right serializers.
  */
-public class ScalaEitherSerializerSnapshot<L, R> implements TypeSerializerSnapshot<Either<L, R>> {
+public class ScalaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedLeftRightSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public ScalaEitherSerializerSnapshot() {}
+	public ScalaEitherSerializerSnapshot() {
+		super(EitherSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public ScalaEitherSerializerSnapshot(TypeSerializer<L> leftSerializer, TypeSerializer<R> rightSerializer) {
-		Preconditions.checkNotNull(leftSerializer);
-		Preconditions.checkNotNull(rightSerializer);
-		this.nestedLeftRightSerializerSnapshot = new CompositeSerializerSnapshot(leftSerializer, rightSerializer);
+	public ScalaEitherSerializerSnapshot(EitherSerializer<L, R> eitherSerializer) {
+		super(eitherSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<Either<L, R>> restoreSerializer() {
-		return new EitherSerializer<>(
-			nestedLeftRightSerializerSnapshot.getRestoreSerializer(0),
-			nestedLeftRightSerializerSnapshot.getRestoreSerializer(1));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<Either<L, R>> resolveSchemaCompatibility(
-			TypeSerializer<Either<L, R>> newSerializer) {
-		checkState(nestedLeftRightSerializerSnapshot != null);
+	protected EitherSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<L> leftSerializer = (TypeSerializer<L>) nestedSerializers[0];
 
-		if (newSerializer instanceof EitherSerializer) {
-			EitherSerializer<L, R> serializer = (EitherSerializer<L, R>) newSerializer;
+		@SuppressWarnings("unchecked")
+		TypeSerializer<R> rightSerializer = (TypeSerializer<R>) nestedSerializers[1];
 
-			return nestedLeftRightSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getLeftSerializer(),
-				serializer.getRightSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedLeftRightSerializerSnapshot.writeCompositeSnapshot(out);
+		return new EitherSerializer<>(leftSerializer, rightSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedLeftRightSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer() };
 	}
 }
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 68432a6f1a5..0427bb35f2b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -125,7 +125,7 @@ class EitherSerializer[A, B](
   // --------------------------------------------------------------------------------------------
 
   override def snapshotConfiguration(): ScalaEitherSerializerSnapshot[A, B] = {
-    new ScalaEitherSerializerSnapshot[A, B](leftSerializer, rightSerializer)
+    new ScalaEitherSerializerSnapshot[A, B](this)
   }
 
   override def ensureCompatibility(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services