You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/19 11:46:11 UTC
[flink] 01/05: [FLINK-17520] [core] Extend
CompositeTypeSerializerSnapshot to allow migration based on outer snapshot
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d2cfc43cde393064b909a7d0f3c3c580b567c4fe
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon May 18 12:58:06 2020 +0800
[FLINK-17520] [core] Extend CompositeTypeSerializerSnapshot to allow migration based on outer snapshot
This commit deprecates isOuterSnapshotCompatible, which only allows
signaling if the outer config is either compatible or not compatible, in
favor of a new resolveOuterSchemaCompatibility method which additionally
allows the user to signal migration.
The change is backwards compatible, and allows subclasses that still
only implement isOuterSnapshotCompatible to work as is.
---
.../typeutils/CompositeTypeSerializerSnapshot.java | 78 +++++++++++++++++-----
1 file changed, 63 insertions(+), 15 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
index fc4a40d..5e6c84e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -45,9 +45,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* <p>Serializers that do have some outer snapshot needs to make sure to implement the methods
* {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, and
- * {@link #isOuterSnapshotCompatible(TypeSerializer)} when using this class as the base for its serializer snapshot
- * class. By default, the base implementations of these methods are empty, i.e. this class assumes that
- * subclasses do not have any outer snapshot that needs to be persisted.
+ * {@link #resolveOuterSchemaCompatibility(TypeSerializer)} (TypeSerializer)} when using this class as the base
+ * for its serializer snapshot class. By default, the base implementations of these methods are empty, i.e. this
+ * class assumes that subclasses do not have any outer snapshot that needs to be persisted.
*
* <h2>Snapshot Versioning</h2>
*
@@ -82,6 +82,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@PublicEvolving
public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerializer<T>> implements TypeSerializerSnapshot<T> {
+ /**
+ * Indicates schema compatibility of the serializer configuration persisted as the outer snapshot.
+ */
+ protected enum OuterSchemaCompatibility {
+ COMPATIBLE_AS_IS,
+ COMPATIBLE_AFTER_MIGRATION,
+ INCOMPATIBLE
+ }
+
/** Magic number for integrity checks during deserialization. */
private static final int MAGIC_NUMBER = 911108;
@@ -168,10 +177,8 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
S castedNewSerializer = correspondingSerializerClass.cast(newSerializer);
- // check that outer configuration is compatible; if not, short circuit result
- if (!isOuterSnapshotCompatible(castedNewSerializer)) {
- return TypeSerializerSchemaCompatibility.incompatible();
- }
+ final OuterSchemaCompatibility outerSchemaCompatibility =
+ resolveOuterSchemaCompatibility(castedNewSerializer);
final TypeSerializer<?>[] newNestedSerializers = getNestedSerializers(castedNewSerializer);
// check that nested serializer arity remains identical; if not, short circuit result
@@ -179,7 +186,10 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
return TypeSerializerSchemaCompatibility.incompatible();
}
- return constructFinalSchemaCompatibilityResult(newNestedSerializers, snapshots);
+ return constructFinalSchemaCompatibilityResult(
+ newNestedSerializers,
+ snapshots,
+ outerSchemaCompatibility);
}
@Internal
@@ -237,7 +247,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
* only has nested serializers and no extra information. Otherwise, if the outer serializer contains
* some extra information that needs to be persisted as part of the serializer snapshot, this
* must be overridden. Note that this method and the corresponding methods
- * {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
+ * {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, {@link #resolveOuterSchemaCompatibility(TypeSerializer)}
* needs to be implemented.
*
* @param out the {@link DataOutputView} to write the outer snapshot to.
@@ -251,7 +261,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
* only has nested serializers and no extra information. Otherwise, if the outer serializer contains
* some extra information that has been persisted as part of the serializer snapshot, this
* must be overridden. Note that this method and the corresponding methods
- * {@link #writeOuterSnapshot(DataOutputView)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
+ * {@link #writeOuterSnapshot(DataOutputView)}, {@link #resolveOuterSchemaCompatibility(TypeSerializer)}
* needs to be implemented.
*
* @param readOuterSnapshotVersion the read version of the outer snapshot.
@@ -275,11 +285,38 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
*
* @return a flag indicating whether or not the new serializer's outer information is compatible with the one
* written in this snapshot.
+ *
+ * @deprecated this method is deprecated, and will be removed in the future.
+ * Please implement {@link #resolveOuterSchemaCompatibility(TypeSerializer)} instead.
*/
+ @Deprecated
protected boolean isOuterSnapshotCompatible(S newSerializer) {
return true;
}
+ /**
+ * Checks the schema compatibility of the given new serializer based on the outer snapshot.
+ *
+ * <p>The base implementation of this method assumes that the outer serializer
+ * only has nested serializers and no extra information, and therefore the result of the check is
+ * {@link OuterSchemaCompatibility#COMPATIBLE_AS_IS}. Otherwise, if the outer serializer contains
+ * some extra information that has been persisted as part of the serializer snapshot, this
+ * must be overridden. Note that this method and the corresponding methods
+ * {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}
+ * needs to be implemented.
+ *
+ * @param newSerializer the new serializer, which contains the new outer information to check against.
+ *
+ * @return a {@link OuterSchemaCompatibility} indicating whether or the new serializer's outer
+ * information is compatible, requires migration, or incompatible with the one written
+ * in this snapshot.
+ */
+ protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(S newSerializer) {
+ return (isOuterSnapshotCompatible(newSerializer))
+ ? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+ : OuterSchemaCompatibility.INCOMPATIBLE;
+ }
+
// ------------------------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------------------------
@@ -311,17 +348,28 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
private TypeSerializerSchemaCompatibility<T> constructFinalSchemaCompatibilityResult(
TypeSerializer<?>[] newNestedSerializers,
- TypeSerializerSnapshot<?>[] nestedSerializerSnapshots) {
+ TypeSerializerSnapshot<?>[] nestedSerializerSnapshots,
+ OuterSchemaCompatibility outerSchemaCompatibility) {
- IntermediateCompatibilityResult<T> intermediateResult =
+ IntermediateCompatibilityResult<T> nestedSerializersCompatibilityResult =
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(newNestedSerializers, nestedSerializerSnapshots);
- if (intermediateResult.isCompatibleWithReconfiguredSerializer()) {
+ if (outerSchemaCompatibility == OuterSchemaCompatibility.INCOMPATIBLE
+ || nestedSerializersCompatibilityResult.isIncompatible()) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+
+ if (outerSchemaCompatibility == OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION
+ || nestedSerializersCompatibilityResult.isCompatibleAfterMigration()) {
+ return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+ }
+
+ if (nestedSerializersCompatibilityResult.isCompatibleWithReconfiguredSerializer()) {
@SuppressWarnings("unchecked")
- TypeSerializer<T> reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers(intermediateResult.getNestedSerializers());
+ TypeSerializer<T> reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers(nestedSerializersCompatibilityResult.getNestedSerializers());
return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredCompositeSerializer);
}
- return intermediateResult.getFinalResult();
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}