You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/19 11:46:11 UTC

[flink] 01/05: [FLINK-17520] [core] Extend CompositeTypeSerializerSnapshot to allow migration based on outer snapshot

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d2cfc43cde393064b909a7d0f3c3c580b567c4fe
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon May 18 12:58:06 2020 +0800

    [FLINK-17520] [core] Extend CompositeTypeSerializerSnapshot to allow migration based on outer snapshot
    
    This commit deprecates isOuterSnapshotCompatible, which only allows
    signaling if the outer config is either compatible or not compatible, in
    favor of a new resolveOuterSchemaCompatibility method which additionally
    allows the user to signal migration.
    
    The change is backwards compatible, and allows subclasses that still
    only implement isOuterSnapshotCompatible to work as is.
---
 .../typeutils/CompositeTypeSerializerSnapshot.java | 78 +++++++++++++++++-----
 1 file changed, 63 insertions(+), 15 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
index fc4a40d..5e6c84e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -45,9 +45,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>Serializers that do have some outer snapshot needs to make sure to implement the methods
  * {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, and
- * {@link #isOuterSnapshotCompatible(TypeSerializer)} when using this class as the base for its serializer snapshot
- * class. By default, the base implementations of these methods are empty, i.e. this class assumes that
- * subclasses do not have any outer snapshot that needs to be persisted.
+ * {@link #resolveOuterSchemaCompatibility(TypeSerializer)} (TypeSerializer)} when using this class as the base
+ * for its serializer snapshot class. By default, the base implementations of these methods are empty, i.e. this
+ * class assumes that subclasses do not have any outer snapshot that needs to be persisted.
  *
  * <h2>Snapshot Versioning</h2>
  *
@@ -82,6 +82,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @PublicEvolving
 public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerializer<T>> implements TypeSerializerSnapshot<T> {
 
+	/**
+	 * Indicates schema compatibility of the serializer configuration persisted as the outer snapshot.
+	 */
+	protected enum OuterSchemaCompatibility {
+		COMPATIBLE_AS_IS,
+		COMPATIBLE_AFTER_MIGRATION,
+		INCOMPATIBLE
+	}
+
 	/** Magic number for integrity checks during deserialization. */
 	private static final int MAGIC_NUMBER = 911108;
 
@@ -168,10 +177,8 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 
 		S castedNewSerializer = correspondingSerializerClass.cast(newSerializer);
 
-		// check that outer configuration is compatible; if not, short circuit result
-		if (!isOuterSnapshotCompatible(castedNewSerializer)) {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
+		final OuterSchemaCompatibility outerSchemaCompatibility =
+			resolveOuterSchemaCompatibility(castedNewSerializer);
 
 		final TypeSerializer<?>[] newNestedSerializers = getNestedSerializers(castedNewSerializer);
 		// check that nested serializer arity remains identical; if not, short circuit result
@@ -179,7 +186,10 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 			return TypeSerializerSchemaCompatibility.incompatible();
 		}
 
-		return constructFinalSchemaCompatibilityResult(newNestedSerializers, snapshots);
+		return constructFinalSchemaCompatibilityResult(
+			newNestedSerializers,
+			snapshots,
+			outerSchemaCompatibility);
 	}
 
 	@Internal
@@ -237,7 +247,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 	 * only has nested serializers and no extra information. Otherwise, if the outer serializer contains
 	 * some extra information that needs to be persisted as part of the serializer snapshot, this
 	 * must be overridden. Note that this method and the corresponding methods
-	 * {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
+	 * {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, {@link #resolveOuterSchemaCompatibility(TypeSerializer)}
 	 * needs to be implemented.
 	 *
 	 * @param out the {@link DataOutputView} to write the outer snapshot to.
@@ -251,7 +261,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 	 * only has nested serializers and no extra information. Otherwise, if the outer serializer contains
 	 * some extra information that has been persisted as part of the serializer snapshot, this
 	 * must be overridden. Note that this method and the corresponding methods
-	 * {@link #writeOuterSnapshot(DataOutputView)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
+	 * {@link #writeOuterSnapshot(DataOutputView)}, {@link #resolveOuterSchemaCompatibility(TypeSerializer)}
 	 * needs to be implemented.
 	 *
 	 * @param readOuterSnapshotVersion the read version of the outer snapshot.
@@ -275,11 +285,38 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 	 *
 	 * @return a flag indicating whether or not the new serializer's outer information is compatible with the one
 	 *         written in this snapshot.
+	 *
+	 * @deprecated this method is deprecated, and will be removed in the future.
+	 *             Please implement {@link #resolveOuterSchemaCompatibility(TypeSerializer)} instead.
 	 */
+	@Deprecated
 	protected boolean isOuterSnapshotCompatible(S newSerializer) {
 		return true;
 	}
 
+	/**
+	 * Checks the schema compatibility of the given new serializer based on the outer snapshot.
+	 *
+	 * <p>The base implementation of this method assumes that the outer serializer
+	 * only has nested serializers and no extra information, and therefore the result of the check is
+	 * {@link OuterSchemaCompatibility#COMPATIBLE_AS_IS}. Otherwise, if the outer serializer contains
+	 * some extra information that has been persisted as part of the serializer snapshot, this
+	 * must be overridden. Note that this method and the corresponding methods
+	 * {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}
+	 * needs to be implemented.
+	 *
+	 * @param newSerializer the new serializer, which contains the new outer information to check against.
+	 *
+	 * @return a {@link OuterSchemaCompatibility} indicating whether or the new serializer's outer
+	 *         information is compatible, requires migration, or incompatible with the one written
+	 *         in this snapshot.
+	 */
+	protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(S newSerializer) {
+		return (isOuterSnapshotCompatible(newSerializer))
+			? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+			: OuterSchemaCompatibility.INCOMPATIBLE;
+	}
+
 	// ------------------------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------------------------
@@ -311,17 +348,28 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 
 	private TypeSerializerSchemaCompatibility<T> constructFinalSchemaCompatibilityResult(
 			TypeSerializer<?>[] newNestedSerializers,
-			TypeSerializerSnapshot<?>[] nestedSerializerSnapshots) {
+			TypeSerializerSnapshot<?>[] nestedSerializerSnapshots,
+			OuterSchemaCompatibility outerSchemaCompatibility) {
 
-		IntermediateCompatibilityResult<T> intermediateResult =
+		IntermediateCompatibilityResult<T> nestedSerializersCompatibilityResult =
 			CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(newNestedSerializers, nestedSerializerSnapshots);
 
-		if (intermediateResult.isCompatibleWithReconfiguredSerializer()) {
+		if (outerSchemaCompatibility == OuterSchemaCompatibility.INCOMPATIBLE
+				|| nestedSerializersCompatibilityResult.isIncompatible()) {
+			return TypeSerializerSchemaCompatibility.incompatible();
+		}
+
+		if (outerSchemaCompatibility == OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION
+				|| nestedSerializersCompatibilityResult.isCompatibleAfterMigration()) {
+			return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+		}
+
+		if (nestedSerializersCompatibilityResult.isCompatibleWithReconfiguredSerializer()) {
 			@SuppressWarnings("unchecked")
-			TypeSerializer<T> reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers(intermediateResult.getNestedSerializers());
+			TypeSerializer<T> reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers(nestedSerializersCompatibilityResult.getNestedSerializers());
 			return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredCompositeSerializer);
 		}
 
-		return intermediateResult.getFinalResult();
+		return TypeSerializerSchemaCompatibility.compatibleAsIs();
 	}
 }