You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:14 UTC

[flink] 01/12: [FLINK-11073] [core] Introduce CompositeTypeSerializerSnapshot

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit beee5b168875fc54bc09780942c4fc724ffccbf0
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 15:51:12 2018 +0800

    [FLINK-11073] [core] Introduce CompositeTypeSerializerSnapshot
    
    The CompositeTypeSerializerSnapshot encapsulates logic for handling
    writing, reading, and deriving final compatibility results for composite
    serializers that have multiple nested serializers as well as some
    static outer configuration (e.g. type class in the
    GenericArraySerializer).
    
    his base class has its own versioning for the format in which it writes
    the outer snapshot and the nested serializer snapshots. The version of
    the serialization format of this based class is defined
    by getCurrentVersion(). This is independent of the version in which
    subclasses writes their outer snapshot, defined by
    getCurrentOuterSnapshotVersion().
    
    This means that the outer snapshot's version can be maintained only
    taking into account changes in how the outer snapshot is written.
    Any changes in the base format does not require upticks in the outer
    snapshot's version
---
 .../typeutils/CompositeSerializerSnapshot.java     |  10 +
 .../typeutils/CompositeTypeSerializerSnapshot.java | 346 +++++++++++++
 .../CompositeTypeSerializerSnapshotTest.java       | 544 +++++++++++++++++++++
 3 files changed, 900 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
index 93f5a70..5b3f775 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
@@ -94,9 +94,19 @@ public class CompositeSerializerSnapshot {
 	}
 
 	/**
+	 * Returns the snapshots of the nested serializers.
+	 *
+	 * @return the snapshots of the nested serializers.
+	 */
+	public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
+		return nestedSnapshots;
+	}
+
+	/**
 	 * Resolves the compatibility of the nested serializer snapshots with the nested
 	 * serializers of the new outer serializer.
 	 */
+	@Deprecated
 	public <T> TypeSerializerSchemaCompatibility<T> resolveCompatibilityWithNested(
 			TypeSerializerSchemaCompatibility<?> outerCompatibility,
 			TypeSerializer<?>... newNestedSerializers) {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
new file mode 100644
index 0000000..71c0836
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A {@link CompositeTypeSerializerSnapshot} is a convenient serializer snapshot class that can be used by
+ * simple serializers which 1) delegates its serialization to multiple nested serializers, and 2) may contain
+ * some extra static information that needs to be persisted as part of its snapshot.
+ *
+ * <p>Examples for this would be the {@link ListSerializer}, {@link MapSerializer}, {@link EitherSerializer}, etc.,
+ * in which case the serializer, called the "outer" serializer in this context, has only some nested serializers that
+ * needs to be persisted as its snapshot, and nothing else that needs to be persisted as the "outer" snapshot.
+ * An example which has non-empty outer snapshots would be the {@link GenericArraySerializer}, which beyond the
+ * nested component serializer, also contains a class of the component type that needs to be persisted.
+ *
+ * <p>Serializers that do have some outer snapshot needs to make sure to implement the methods
+ * {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, and
+ * {@link #isOuterSnapshotCompatible(TypeSerializer)} when using this class as the base for its serializer snapshot
+ * class. By default, the base implementations of these methods are empty, i.e. this class assumes that
+ * subclasses do not have any outer snapshot that needs to be persisted.
+ *
+ * <h2>Snapshot Versioning</h2>
+ *
+ * <p>This base class has its own versioning for the format in which it writes the outer snapshot and the
+ * nested serializer snapshots. The version of the serialization format of this based class is defined
+ * by {@link #getCurrentVersion()}. This is independent of the version in which subclasses writes their outer snapshot,
+ * defined by {@link #getCurrentOuterSnapshotVersion()}.
+ * This means that the outer snapshot's version can be maintained only taking into account changes in how the
+ * outer snapshot is written. Any changes in the base format does not require upticks in the outer snapshot's version.
+ *
+ * <h2>Serialization Format</hr>
+ *
+ * <p>The current version of the serialization format of a {@link CompositeTypeSerializerSnapshot} is as follows:
+ *
+ * <pre>{@code
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | CompositeTypeSerializerSnapshot | CompositeTypeSerializerSnapshot |          Outer snapshot         |
+ * |           version               |          MAGIC_NUMBER           |              version            |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                                               Outer snapshot                                        |
+ * |                                   #writeOuterSnapshot(DataOutputView out)                           |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |      Delegate MAGIC_NUMBER      |         Delegate version        |     Num. nested serializers     |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                                     Nested serializer snapshots                                     |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * }</pre>
+ *
+ * @param <T> The data type that the originating serializer of this snapshot serializes.
+ * @param <S> The type of the originating serializer.
+ */
+@PublicEvolving
+public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerializer> implements TypeSerializerSnapshot<T> {
+
+	/** Magic number for integrity checks during deserialization. */
+	private static final int MAGIC_NUMBER = 911108;
+
+	/**
+	 * Current version of the base serialization format.
+	 *
+	 * <p>NOTE: We start from version 3. This version is represented by the {@link #getCurrentVersion()} method.
+	 * Previously, this method was used to represent the outer snapshot's version (now, represented
+	 * by the {@link #getCurrentOuterSnapshotVersion()} method).
+	 *
+	 * <p>To bridge this transition, we set the starting version of the base format to be at least
+	 * larger than the highest version of previously defined values in implementing subclasses,
+	 * which was {@link #HIGHEST_LEGACY_READ_VERSION}. This allows us to identify legacy deserialization paths,
+	 * which did not contain versioning for the base format, simply by checking if the read
+	 * version of the snapshot is smaller than or equal to {@link #HIGHEST_LEGACY_READ_VERSION}.
+	 */
+	private static final int VERSION = 3;
+
+	private static final int HIGHEST_LEGACY_READ_VERSION = 2;
+
+	private CompositeSerializerSnapshot compositeSerializerSnapshot;
+
+	private final Class<S> correspondingSerializerClass;
+
+	/**
+	 * Constructor to be used for read instantiation.
+	 *
+	 * @param correspondingSerializerClass the expected class of the new serializer.
+	 */
+	public CompositeTypeSerializerSnapshot(Class<S> correspondingSerializerClass) {
+		this.correspondingSerializerClass = Preconditions.checkNotNull(correspondingSerializerClass);
+	}
+
+	/**
+	 * Constructor to be used for writing the snapshot.
+	 *
+	 * @param serializerInstance an instance of the originating serializer of this snapshot.
+	 */
+	@SuppressWarnings("unchecked")
+	public CompositeTypeSerializerSnapshot(S serializerInstance) {
+		Preconditions.checkNotNull(serializerInstance);
+		this.compositeSerializerSnapshot = new CompositeSerializerSnapshot(getNestedSerializers(serializerInstance));
+		this.correspondingSerializerClass = (Class<S>) serializerInstance.getClass();
+	}
+
+	@Override
+	public final int getCurrentVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public final void writeSnapshot(DataOutputView out) throws IOException {
+		internalWriteOuterSnapshot(out);
+		compositeSerializerSnapshot.writeCompositeSnapshot(out);
+	}
+
+	@Override
+	public final void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		if (readVersion > HIGHEST_LEGACY_READ_VERSION) {
+			internalReadOuterSnapshot(in, userCodeClassLoader);
+		} else {
+			legacyInternalReadOuterSnapshot(readVersion, in, userCodeClassLoader);
+		}
+		this.compositeSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	}
+
+	@Override
+	public final TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
+		if (newSerializer.getClass() != correspondingSerializerClass) {
+			return TypeSerializerSchemaCompatibility.incompatible();
+		}
+
+		S castedNewSerializer = correspondingSerializerClass.cast(newSerializer);
+
+		// check that outer configuration is compatible; if not, short circuit result
+		if (!isOuterSnapshotCompatible(castedNewSerializer)) {
+			return TypeSerializerSchemaCompatibility.incompatible();
+		}
+
+		// since outer configuration is compatible, the final compatibility result depends only on the nested serializers
+		return constructFinalSchemaCompatibilityResult(
+			getNestedSerializers(castedNewSerializer),
+			compositeSerializerSnapshot.getNestedSerializerSnapshots());
+	}
+
+	@Override
+	public final TypeSerializer<T> restoreSerializer() {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<T> serializer = (TypeSerializer<T>)
+			createOuterSerializerWithNestedSerializers(compositeSerializerSnapshot.getRestoreSerializers());
+
+		return serializer;
+	}
+
+	// ------------------------------------------------------------------------------------------
+	//  Outer serializer access methods
+	// ------------------------------------------------------------------------------------------
+
+	/**
+	 * Returns the version of the current outer snapshot's written binary format.
+	 *
+	 * @return the version of the current outer snapshot's written binary format.
+	 */
+	protected abstract int getCurrentOuterSnapshotVersion();
+
+	/**
+	 * Gets the nested serializers from the outer serializer.
+	 *
+	 * @param outerSerializer the outer serializer.
+	 *
+	 * @return the nested serializers.
+	 */
+	protected abstract TypeSerializer<?>[] getNestedSerializers(S outerSerializer);
+
+	/**
+	 * Creates an instance of the outer serializer with a given array of its nested serializers.
+	 *
+	 * @param nestedSerializers array of nested serializers to create the outer serializer with.
+	 *
+	 * @return an instance of the outer serializer.
+	 */
+	protected abstract S createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers);
+
+	// ------------------------------------------------------------------------------------------
+	//  Outer snapshot methods; need to be overridden if outer snapshot is not empty,
+	//  or in other words, the outer serializer has extra configuration beyond its nested serializers.
+	// ------------------------------------------------------------------------------------------
+
+	/**
+	 * Writes the outer snapshot, i.e. any information beyond the nested serializers of the outer serializer.
+	 *
+	 * <p>The base implementation of this methods writes nothing, i.e. it assumes that the outer serializer
+	 * only has nested serializers and no extra information. Otherwise, if the outer serializer contains
+	 * some extra information that needs to be persisted as part of the serializer snapshot, this
+	 * must be overridden. Note that this method and the corresponding methods
+	 * {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
+	 * needs to be implemented.
+	 *
+	 * @param out the {@link DataOutputView} to write the outer snapshot to.
+	 */
+	protected void writeOuterSnapshot(DataOutputView out) throws IOException {}
+
+	/**
+	 * Reads the outer snapshot, i.e. any information beyond the nested serializers of the outer serializer.
+	 *
+	 * <p>The base implementation of this methods reads nothing, i.e. it assumes that the outer serializer
+	 * only has nested serializers and no extra information. Otherwise, if the outer serializer contains
+	 * some extra information that has been persisted as part of the serializer snapshot, this
+	 * must be overridden. Note that this method and the corresponding methods
+	 * {@link #writeOuterSnapshot(DataOutputView)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
+	 * needs to be implemented.
+	 *
+	 * @param readOuterSnapshotVersion the read version of the outer snapshot.
+	 * @param in the {@link DataInputView} to read the outer snapshot from.
+	 * @param userCodeClassLoader the user code class loader.
+	 */
+	protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {}
+
+	/**
+	 * Checks whether the outer snapshot is compatible with a given new serializer.
+	 *
+	 * <p>The base implementation of this method just returns {@code true}, i.e. it assumes that the outer serializer
+	 * only has nested serializers and no extra information, and therefore the result of the check must always
+	 * be true. Otherwise, if the outer serializer contains
+	 * some extra information that has been persisted as part of the serializer snapshot, this
+	 * must be overridden. Note that this method and the corresponding methods
+	 * {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}
+	 * needs to be implemented.
+	 *
+	 * @param newSerializer the new serializer, which contains the new outer information to check against.
+	 *
+	 * @return a flag indicating whether or not the new serializer's outer information is compatible with the one
+	 *         written in this snapshot.
+	 */
+	protected boolean isOuterSnapshotCompatible(S newSerializer) {
+		return true;
+	}
+
+	// ------------------------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------------------------
+
+	private void internalWriteOuterSnapshot(DataOutputView out) throws IOException {
+		out.writeInt(MAGIC_NUMBER);
+		out.writeInt(getCurrentOuterSnapshotVersion());
+
+		writeOuterSnapshot(out);
+	}
+
+	private void internalReadOuterSnapshot(DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		final int magicNumber = in.readInt();
+		if (magicNumber != MAGIC_NUMBER) {
+			throw new IOException(String.format("Corrupt data, magic number mismatch. Expected %8x, found %8x",
+				MAGIC_NUMBER, magicNumber));
+		}
+
+		final int outerSnapshotVersion = in.readInt();
+		readOuterSnapshot(outerSnapshotVersion, in, userCodeClassLoader);
+	}
+
+	private void legacyInternalReadOuterSnapshot(
+			int legacyReadVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+
+		// legacy versions did not contain the pre-fixed magic numbers; just read the outer snapshot
+		readOuterSnapshot(legacyReadVersion, in, userCodeClassLoader);
+	}
+
+	private TypeSerializerSchemaCompatibility<T> constructFinalSchemaCompatibilityResult(
+			TypeSerializer<?>[] newNestedSerializers,
+			TypeSerializerSnapshot<?>[] nestedSerializerSnapshots) {
+
+		Preconditions.checkArgument(newNestedSerializers.length == nestedSerializerSnapshots.length,
+			"Different number of new serializers and existing serializer snapshots.");
+
+		TypeSerializer<?>[] reconfiguredNestedSerializers = new TypeSerializer[newNestedSerializers.length];
+
+		// check nested serializers for compatibility
+		boolean nestedSerializerRequiresMigration = false;
+		boolean hasReconfiguredNestedSerializers = false;
+		for (int i = 0; i < nestedSerializerSnapshots.length; i++) {
+			TypeSerializerSchemaCompatibility<?> compatibility =
+				resolveCompatibility(newNestedSerializers[i], nestedSerializerSnapshots[i]);
+
+			// if any one of the new nested serializers is incompatible, we can just short circuit the result
+			if (compatibility.isIncompatible()) {
+				return TypeSerializerSchemaCompatibility.incompatible();
+			}
+
+			if (compatibility.isCompatibleAfterMigration()) {
+				nestedSerializerRequiresMigration = true;
+			} else if (compatibility.isCompatibleWithReconfiguredSerializer()) {
+				hasReconfiguredNestedSerializers = true;
+				reconfiguredNestedSerializers[i] = compatibility.getReconfiguredSerializer();
+			} else if (compatibility.isCompatibleAsIs()) {
+				reconfiguredNestedSerializers[i] = newNestedSerializers[i];
+			} else {
+				throw new IllegalStateException("Undefined compatibility type.");
+			}
+		}
+
+		if (nestedSerializerRequiresMigration) {
+			return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+		}
+
+		if (hasReconfiguredNestedSerializers) {
+			@SuppressWarnings("unchecked")
+			TypeSerializer<T> reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers(reconfiguredNestedSerializers);
+			return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredCompositeSerializer);
+		}
+
+		// ends up here if everything is compatible as is
+		return TypeSerializerSchemaCompatibility.compatibleAsIs();
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <E> TypeSerializerSchemaCompatibility<E> resolveCompatibility(
+		TypeSerializer<?> serializer,
+		TypeSerializerSnapshot<?> snapshot) {
+
+		TypeSerializer<E> typedSerializer = (TypeSerializer<E>) serializer;
+		TypeSerializerSnapshot<E> typedSnapshot = (TypeSerializerSnapshot<E>) snapshot;
+
+		return typedSnapshot.resolveSchemaCompatibility(typedSerializer);
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
new file mode 100644
index 0000000..0f77e3d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Test suite for the {@link CompositeTypeSerializerSnapshot}.
+ */
+public class CompositeTypeSerializerSnapshotTest {
+
+	// ------------------------------------------------------------------------------------------------
+	//  Scope: tests CompositeTypeSerializerSnapshot#resolveSchemaCompatibility
+	// ------------------------------------------------------------------------------------------------
+
+	@Test
+	public void testIncompatiblePrecedence() throws IOException {
+		final String OUTER_CONFIG = "outer-config";
+		final TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION),
+			new NestedSerializer(TargetCompatibility.INCOMPATIBLE),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER)
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				OUTER_CONFIG,
+				OUTER_CONFIG);
+
+		Assert.assertTrue(compatibility.isIncompatible());
+	}
+
+	@Test
+	public void testCompatibleAfterMigrationPrecedence() throws IOException {
+		final String OUTER_CONFIG = "outer-config";
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				OUTER_CONFIG,
+				OUTER_CONFIG);
+
+		Assert.assertTrue(compatibility.isCompatibleAfterMigration());
+	}
+
+	@Test
+	public void testCompatibleWithReconfiguredSerializerPrecedence() throws IOException {
+		final String OUTER_CONFIG = "outer-config";
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				OUTER_CONFIG,
+				OUTER_CONFIG);
+
+		Assert.assertTrue(compatibility.isCompatibleWithReconfiguredSerializer());
+
+		TestCompositeTypeSerializer reconfiguredSerializer =
+			(TestCompositeTypeSerializer) compatibility.getReconfiguredSerializer();
+		TypeSerializer<?>[] reconfiguredNestedSerializers = reconfiguredSerializer.getNestedSerializers();
+		// nested serializer at index 1 should strictly be a ReconfiguredNestedSerializer
+		Assert.assertTrue(reconfiguredNestedSerializers[0].getClass() == NestedSerializer.class);
+		Assert.assertTrue(reconfiguredNestedSerializers[1].getClass() == ReconfiguredNestedSerializer.class);
+		Assert.assertTrue(reconfiguredNestedSerializers[2].getClass() == NestedSerializer.class);
+	}
+
+	@Test
+	public void testCompatibleAsIsPrecedence() throws IOException {
+		final String OUTER_CONFIG = "outer-config";
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				OUTER_CONFIG,
+				OUTER_CONFIG);
+
+		Assert.assertTrue(compatibility.isCompatibleAsIs());
+	}
+
+	@Test
+	public void testOuterSnapshotCompatibilityPrecedence() throws IOException {
+		final String INIT_OUTER_CONFIG = "outer-config";
+		final String INCOMPAT_OUTER_CONFIG = "incompat-outer-config";
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				INIT_OUTER_CONFIG,
+				INCOMPAT_OUTER_CONFIG);
+
+		// even though nested serializers are compatible, incompatibility of the outer
+		// snapshot should have higher precedence in the final result
+		Assert.assertTrue(compatibility.isIncompatible());
+	}
+
+	private TypeSerializerSchemaCompatibility<String> snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+			TypeSerializer<?>[] testNestedSerializers,
+			String initialOuterConfiguration,
+			String newOuterConfiguration) throws IOException {
+		TestCompositeTypeSerializer testSerializer =
+			new TestCompositeTypeSerializer(initialOuterConfiguration, testNestedSerializers);
+
+		TypeSerializerSnapshot<String> testSerializerSnapshot = testSerializer.snapshotConfiguration();
+
+		DataOutputSerializer out = new DataOutputSerializer(128);
+		TypeSerializerSnapshot.writeVersionedSnapshot(out, testSerializerSnapshot);
+
+		DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+		testSerializerSnapshot = TypeSerializerSnapshot.readVersionedSnapshot(
+			in, Thread.currentThread().getContextClassLoader());
+
+		TestCompositeTypeSerializer newTestSerializer =
+			new TestCompositeTypeSerializer(newOuterConfiguration, testNestedSerializers);
+		return testSerializerSnapshot.resolveSchemaCompatibility(newTestSerializer);
+	}
+
+	// ------------------------------------------------------------------------------------------------
+	//  Scope: tests CompositeTypeSerializerSnapshot#restoreSerializer
+	// ------------------------------------------------------------------------------------------------
+
+	@Test
+	public void testRestoreCompositeTypeSerializer() throws IOException {
+		// the target compatibilities of the nested serializers doesn't matter,
+		// because we're only testing the restore serializer
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.INCOMPATIBLE),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION)
+		};
+
+		TestCompositeTypeSerializer testSerializer = new TestCompositeTypeSerializer("outer-config", testNestedSerializers);
+
+		TypeSerializerSnapshot<String> testSerializerSnapshot = testSerializer.snapshotConfiguration();
+
+		DataOutputSerializer out = new DataOutputSerializer(128);
+		TypeSerializerSnapshot.writeVersionedSnapshot(out, testSerializerSnapshot);
+
+		DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+		testSerializerSnapshot = TypeSerializerSnapshot.readVersionedSnapshot(
+			in, Thread.currentThread().getContextClassLoader());
+
+		// now, restore the composite type serializer;
+		// the restored nested serializer should be a RestoredNestedSerializer
+		testSerializer = (TestCompositeTypeSerializer) testSerializerSnapshot.restoreSerializer();
+		Assert.assertTrue(testSerializer.getNestedSerializers()[0].getClass() == RestoredNestedSerializer.class);
+		Assert.assertTrue(testSerializer.getNestedSerializers()[1].getClass() == RestoredNestedSerializer.class);
+		Assert.assertTrue(testSerializer.getNestedSerializers()[2].getClass() == RestoredNestedSerializer.class);
+	}
+
+	// ------------------------------------------------------------------------------------------------
+	//  Test utilities
+	// ------------------------------------------------------------------------------------------------
+
+	/**
+	 * A simple composite serializer used for testing.
+	 * It can be configured with an array of nested serializers, as well as outer configuration (represented as String).
+	 */
+	public static class TestCompositeTypeSerializer extends TypeSerializer<String> {
+
+		private static final long serialVersionUID = -545688468997398105L;
+
+		private static final StringSerializer delegateSerializer = StringSerializer.INSTANCE;
+
+		private final String outerConfiguration;
+
+		private final TypeSerializer<?>[] nestedSerializers;
+
+		TestCompositeTypeSerializer(
+				String outerConfiguration,
+				TypeSerializer<?>[] nestedSerializers) {
+			this.outerConfiguration = outerConfiguration;
+			this.nestedSerializers = nestedSerializers;
+		}
+
+		public String getOuterConfiguration() {
+			return outerConfiguration;
+		}
+
+		TypeSerializer<?>[] getNestedSerializers() {
+			return nestedSerializers;
+		}
+
+		@Override
+		public TypeSerializerSnapshot<String> snapshotConfiguration() {
+			return new TestCompositeTypeSerializerSnapshot(this);
+		}
+
+		// --------------------------------------------------------------------------------
+		//  Serialization delegation
+		// --------------------------------------------------------------------------------
+
+		@Override
+		public String deserialize(String reuse, DataInputView source) throws IOException {
+			return delegateSerializer.deserialize(reuse, source);
+		}
+
+		@Override
+		public String deserialize(DataInputView source) throws IOException {
+			return delegateSerializer.deserialize(source);
+		}
+
+		@Override
+		public void serialize(String record, DataOutputView target) throws IOException {
+			delegateSerializer.serialize(record, target);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			delegateSerializer.copy(source, target);
+		}
+
+		@Override
+		public String copy(String from) {
+			return delegateSerializer.copy(from);
+		}
+
+		@Override
+		public String copy(String from, String reuse) {
+			return delegateSerializer.copy(from, reuse);
+		}
+
+		@Override
+		public String createInstance() {
+			return delegateSerializer.createInstance();
+		}
+
+		@Override
+		public TypeSerializer<String> duplicate() {
+			return this;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (canEqual(obj)) {
+				return Arrays.equals(nestedSerializers, ((TestCompositeTypeSerializer) obj).getNestedSerializers());
+			}
+			return false;
+		}
+
+		@Override
+		public int hashCode() {
+			return Arrays.hashCode(nestedSerializers);
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof TestCompositeTypeSerializer;
+		}
+	}
+
+	/**
+	 * Snapshot class for the {@link TestCompositeTypeSerializer}.
+	 */
+	public static class TestCompositeTypeSerializerSnapshot extends CompositeTypeSerializerSnapshot<String, TestCompositeTypeSerializer> {
+
+		private String outerConfiguration;
+
+		public TestCompositeTypeSerializerSnapshot() {
+			super(TestCompositeTypeSerializer.class);
+		}
+
+		TestCompositeTypeSerializerSnapshot(TestCompositeTypeSerializer serializer) {
+			super(serializer);
+			this.outerConfiguration = serializer.getOuterConfiguration();
+		}
+
+		@Override
+		protected TestCompositeTypeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			return new TestCompositeTypeSerializer(outerConfiguration, nestedSerializers);
+		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(TestCompositeTypeSerializer outerSerializer) {
+			return outerSerializer.getNestedSerializers();
+		}
+
+		@Override
+		protected void writeOuterSnapshot(DataOutputView out) throws IOException {
+			out.writeUTF(outerConfiguration);
+		}
+
+		@Override
+		public void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+			Assert.assertEquals(getCurrentOuterSnapshotVersion(), readOuterSnapshotVersion);
+			this.outerConfiguration = in.readUTF();
+		}
+
+		@Override
+		protected boolean isOuterSnapshotCompatible(TestCompositeTypeSerializer newSerializer) {
+			return outerConfiguration.equals(newSerializer.getOuterConfiguration());
+		}
+
+		@Override
+		public int getCurrentOuterSnapshotVersion() {
+			return 1;
+		}
+	}
+
+	public enum TargetCompatibility {
+		COMPATIBLE_AS_IS,
+		COMPATIBLE_AFTER_MIGRATION,
+		COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
+		INCOMPATIBLE
+	}
+
+	/**
+	 * Used as nested serializers in the test composite serializer.
+	 * A nested serializer can be configured with a {@link TargetCompatibility},
+	 * which indicates what the result of the schema compatibility check should be
+	 * when a new instance of it is being checked for compatibility.
+	 */
+	public static class NestedSerializer extends TypeSerializer<String> {
+
+		private static final long serialVersionUID = -6175000932620623446L;
+
+		private static final StringSerializer delegateSerializer = StringSerializer.INSTANCE;
+
+		private final TargetCompatibility targetCompatibility;
+
+		NestedSerializer(TargetCompatibility targetCompatibility) {
+			this.targetCompatibility = targetCompatibility;
+		}
+
+		@Override
+		public TypeSerializerSnapshot<String> snapshotConfiguration() {
+			return new NestedSerializerSnapshot(targetCompatibility);
+		}
+
+		// --------------------------------------------------------------------------------
+		//  Serialization delegation
+		// --------------------------------------------------------------------------------
+
+		@Override
+		public String deserialize(String reuse, DataInputView source) throws IOException {
+			return delegateSerializer.deserialize(reuse, source);
+		}
+
+		@Override
+		public String deserialize(DataInputView source) throws IOException {
+			return delegateSerializer.deserialize(source);
+		}
+
+		@Override
+		public void serialize(String record, DataOutputView target) throws IOException {
+			delegateSerializer.serialize(record, target);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			delegateSerializer.copy(source, target);
+		}
+
+		@Override
+		public String copy(String from) {
+			return delegateSerializer.copy(from);
+		}
+
+		@Override
+		public String copy(String from, String reuse) {
+			return delegateSerializer.copy(from, reuse);
+		}
+
+		@Override
+		public String createInstance() {
+			return delegateSerializer.createInstance();
+		}
+
+		@Override
+		public TypeSerializer<String> duplicate() {
+			return this;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (canEqual(obj)) {
+				return targetCompatibility == ((NestedSerializer) obj).targetCompatibility;
+			}
+			return false;
+		}
+
+		@Override
+		public int hashCode() {
+			return targetCompatibility.hashCode();
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof NestedSerializer;
+		}
+	}
+
+	/**
+	 * Snapshot of the {@link NestedSerializer}.
+	 */
+	public static class NestedSerializerSnapshot implements TypeSerializerSnapshot<String> {
+
+		private TargetCompatibility targetCompatibility;
+
+		public NestedSerializerSnapshot() {}
+
+		public NestedSerializerSnapshot(TargetCompatibility targetCompatibility) {
+			this.targetCompatibility = targetCompatibility;
+		}
+
+		@Override
+		public void writeSnapshot(DataOutputView out) throws IOException {
+			out.writeInt(targetCompatibility.ordinal());
+		}
+
+		@Override
+		public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+			this.targetCompatibility = TargetCompatibility.values()[in.readInt()];
+		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<String> resolveSchemaCompatibility(TypeSerializer<String> newSerializer) {
+			// checks the exact class instead of using instanceof;
+			// this ensures that we get a new serializer, and not a ReconfiguredNestedSerializer or RestoredNestedSerializer
+			if (newSerializer.getClass() == NestedSerializer.class) {
+				switch (targetCompatibility) {
+					case COMPATIBLE_AS_IS:
+						return TypeSerializerSchemaCompatibility.compatibleAsIs();
+					case COMPATIBLE_AFTER_MIGRATION:
+						return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+					case COMPATIBLE_WITH_RECONFIGURED_SERIALIZER:
+						return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
+							new ReconfiguredNestedSerializer(targetCompatibility));
+					case INCOMPATIBLE:
+						return TypeSerializerSchemaCompatibility.incompatible();
+					default:
+						throw new IllegalStateException("Unexpected target compatibility.");
+				}
+			}
+
+			throw new IllegalArgumentException("Expected the new serializer to be of class " + NestedSerializer.class);
+		}
+
+		@Override
+		public TypeSerializer<String> restoreSerializer() {
+			return new RestoredNestedSerializer(targetCompatibility);
+		}
+
+		@Override
+		public int getCurrentVersion() {
+			return 1;
+		}
+	}
+
+	/**
+	 * A variant of the {@link NestedSerializer} used only when creating a reconfigured instance
+	 * of the serializer. This is used in tests as a tag to identify that the correct serializer
+	 * instances are being used.
+	 */
+	static class ReconfiguredNestedSerializer extends NestedSerializer {
+
+		private static final long serialVersionUID = -1396401178636869659L;
+
+		public ReconfiguredNestedSerializer(TargetCompatibility targetCompatibility) {
+			super(targetCompatibility);
+		}
+
+	}
+
+	/**
+	 * A variant of the {@link NestedSerializer} used only when creating a restored instance
+	 * of the serializer. This is used in tests as a tag to identify that the correct serializer
+	 * instances are being used.
+	 */
+	static class RestoredNestedSerializer extends NestedSerializer {
+
+		private static final long serialVersionUID = -1396401178636869659L;
+
+		public RestoredNestedSerializer(TargetCompatibility targetCompatibility) {
+			super(targetCompatibility);
+		}
+
+	}
+}