You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/07 13:33:07 UTC
[flink] 01/05: [FLINK-11073] [core] Add
COMPATIBLE_WITH_RECONFIGURED_SERIALIZER to
TypeSerializerSchemaCompatibility
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1794ea06bd351a5b3c4219d4097b2516c0298447
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Nov 29 16:32:01 2018 +0800
[FLINK-11073] [core] Add COMPATIBLE_WITH_RECONFIGURED_SERIALIZER to TypeSerializerSchemaCompatibility
---
.../TypeSerializerSchemaCompatibility.java | 44 ++++++++++++++++++++++
1 file changed, 44 insertions(+)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
index d488799..4bb4aa0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
@@ -57,6 +57,12 @@ public class TypeSerializerSchemaCompatibility<T> {
COMPATIBLE_AFTER_MIGRATION,
/**
+ * This indicates that a reconfigured version of the new serializer
+ * is compatible, and should be used instead of the original new serializer.
+ */
+ COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
+
+ /**
* This indicates that the new serializer is incompatible, even with migration.
* This normally implies that the deserialized Java class can not be commonly recognized
* by the previous and new serializer.
@@ -69,6 +75,8 @@ public class TypeSerializerSchemaCompatibility<T> {
*/
private final Type resultType;
+ private final TypeSerializer<T> reconfiguredNewSerializer;
+
/**
* Returns a result that indicates that the new serializer is compatible and no migration is required.
* The new serializer can continued to be used as is.
@@ -90,6 +98,20 @@ public class TypeSerializerSchemaCompatibility<T> {
}
/**
+ * Returns a result that indicates a reconfigured version of the new serializer is compatible, and should be
+ * used instead of the original new serializer.
+ *
+ * @param reconfiguredSerializer the reconfigured version of the new serializer.
+ * @return a result that indicates a reconfigured version of the new serializer is compatible, and should be
+ * used instead of the original new serializer.
+ */
+ public static <T> TypeSerializerSchemaCompatibility<T> compatibleWithReconfiguredSerializer(TypeSerializer<T> reconfiguredSerializer) {
+ return new TypeSerializerSchemaCompatibility<>(
+ Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
+ Preconditions.checkNotNull(reconfiguredSerializer));
+ }
+
+ /**
* Returns a result that indicates there is no possible way for the new serializer to be use-able.
* This normally indicates that there is no common Java class between what the previous bytes can be
* deserialized into and what can be written by the new serializer.
@@ -105,6 +127,7 @@ public class TypeSerializerSchemaCompatibility<T> {
private TypeSerializerSchemaCompatibility(Type resultType, @Nullable TypeSerializer<T> reconfiguredNewSerializer) {
this.resultType = Preconditions.checkNotNull(resultType);
+ this.reconfiguredNewSerializer = reconfiguredNewSerializer;
}
/**
@@ -126,6 +149,27 @@ public class TypeSerializerSchemaCompatibility<T> {
}
/**
+ * Returns whether or not the type of the compatibility is {@link Type#COMPATIBLE_WITH_RECONFIGURED_SERIALIZER}.
+ *
+ * @return whether or not the type of the compatibility is {@link Type#COMPATIBLE_WITH_RECONFIGURED_SERIALIZER}.
+ */
+ public boolean isCompatibleWithReconfiguredSerializer() {
+ return resultType == Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER;
+ }
+
+ /**
+ * Gets the reconfigured serializer. This throws an exception if
+ * {@link #isCompatibleWithReconfiguredSerializer()} is {@code false}.
+ */
+ public TypeSerializer<T> getReconfiguredSerializer() {
+ Preconditions.checkState(
+ isCompatibleWithReconfiguredSerializer(),
+ "It is only possible to get a reconfigured serializer if the compatibility type is %s, but the type is %s",
+ Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER, resultType);
+ return reconfiguredNewSerializer;
+ }
+
+ /**
* Returns whether or not the type of the compatibility is {@link Type#INCOMPATIBLE}.
*
* @return whether or not the type of the compatibility is {@link Type#INCOMPATIBLE}.