You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/07 13:33:07 UTC

[flink] 01/05: [FLINK-11073] [core] Add COMPATIBLE_WITH_RECONFIGURED_SERIALIZER to TypeSerializerSchemaCompatibility

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1794ea06bd351a5b3c4219d4097b2516c0298447
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Nov 29 16:32:01 2018 +0800

    [FLINK-11073] [core] Add COMPATIBLE_WITH_RECONFIGURED_SERIALIZER to TypeSerializerSchemaCompatibility
---
 .../TypeSerializerSchemaCompatibility.java         | 44 ++++++++++++++++++++++
 1 file changed, 44 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
index d488799..4bb4aa0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
@@ -57,6 +57,12 @@ public class TypeSerializerSchemaCompatibility<T> {
 		COMPATIBLE_AFTER_MIGRATION,
 
 		/**
+		 * This indicates that a reconfigured version of the new serializer
+		 * is compatible, and should be used instead of the original new serializer.
+		 */
+		COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
+
+		/**
 		 * This indicates that the new serializer is incompatible, even with migration.
 		 * This normally implies that the deserialized Java class can not be commonly recognized
 		 * by the previous and new serializer.
@@ -69,6 +75,8 @@ public class TypeSerializerSchemaCompatibility<T> {
 	 */
 	private final Type resultType;
 
+	private final TypeSerializer<T> reconfiguredNewSerializer;
+
 	/**
 	 * Returns a result that indicates that the new serializer is compatible and no migration is required.
 	 * The new serializer can continued to be used as is.
@@ -90,6 +98,20 @@ public class TypeSerializerSchemaCompatibility<T> {
 	}
 
 	/**
+	 * Returns a result that indicates a reconfigured version of the new serializer is compatible, and should be
+	 * used instead of the original new serializer.
+	 *
+	 * @param reconfiguredSerializer the reconfigured version of the new serializer.
+	 * @return a result that indicates a reconfigured version of the new serializer is compatible, and should be
+	 *         used instead of the original new serializer.
+	 */
+	public static <T> TypeSerializerSchemaCompatibility<T> compatibleWithReconfiguredSerializer(TypeSerializer<T> reconfiguredSerializer) {
+		return new TypeSerializerSchemaCompatibility<>(
+			Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
+			Preconditions.checkNotNull(reconfiguredSerializer));
+	}
+
+	/**
 	 * Returns a result that indicates there is no possible way for the new serializer to be use-able.
 	 * This normally indicates that there is no common Java class between what the previous bytes can be
 	 * deserialized into and what can be written by the new serializer.
@@ -105,6 +127,7 @@ public class TypeSerializerSchemaCompatibility<T> {
 
 	private TypeSerializerSchemaCompatibility(Type resultType, @Nullable TypeSerializer<T> reconfiguredNewSerializer) {
 		this.resultType = Preconditions.checkNotNull(resultType);
+		this.reconfiguredNewSerializer = reconfiguredNewSerializer;
 	}
 
 	/**
@@ -126,6 +149,27 @@ public class TypeSerializerSchemaCompatibility<T> {
 	}
 
 	/**
+	 * Returns whether or not the type of the compatibility is {@link Type#COMPATIBLE_WITH_RECONFIGURED_SERIALIZER}.
+	 *
+	 * @return whether or not the type of the compatibility is {@link Type#COMPATIBLE_WITH_RECONFIGURED_SERIALIZER}.
+	 */
+	public boolean isCompatibleWithReconfiguredSerializer() {
+		return resultType == Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER;
+	}
+
+	/**
+	 * Gets the reconfigured serializer. This throws an exception if
+	 * {@link #isCompatibleWithReconfiguredSerializer()} is {@code false}.
+	 */
+	public TypeSerializer<T> getReconfiguredSerializer() {
+		Preconditions.checkState(
+			isCompatibleWithReconfiguredSerializer(),
+			"It is only possible to get a reconfigured serializer if the compatibility type is %s, but the type is %s",
+			Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER, resultType);
+		return reconfiguredNewSerializer;
+	}
+
+	/**
 	 * Returns whether or not the type of the compatibility is {@link Type#INCOMPATIBLE}.
 	 *
 	 * @return whether or not the type of the compatibility is {@link Type#INCOMPATIBLE}.