You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/18 08:48:58 UTC
[flink] 02/02: [FLINK-11372] [core] Incorrect delegation of
compatibility checks to new CompositeTypeSerializerSnapshots
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8e7b80f755efc257d7a2803fc604e2746b6530a3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jan 16 18:06:16 2019 +0100
[FLINK-11372] [core] Incorrect delegation of compatibility checks to new CompositeTypeSerializerSnapshots
This closes #7521.
---
.../typeutils/CompositeTypeSerializerSnapshot.java | 11 ++++-
.../typeutils/CompositeTypeSerializerUtil.java | 51 ++++++++++++++++++++++
.../base/CollectionSerializerConfigSnapshot.java | 18 ++++----
.../base/GenericArraySerializerConfigSnapshot.java | 18 +++++---
.../base/GenericArraySerializerSnapshot.java | 9 ++++
.../base/MapSerializerConfigSnapshot.java | 15 +++++--
.../runtime/EitherSerializerConfigSnapshot.java | 33 +++++---------
.../runtime/EitherSerializerSnapshot.java | 10 +++--
8 files changed, 116 insertions(+), 49 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
index c73e24c..49ff8a0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.common.typeutils;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
@@ -147,6 +148,13 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
@Override
public final TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
+ return internalResolveSchemaCompatibility(newSerializer, nestedSerializersSnapshotDelegate.getNestedSerializerSnapshots());
+ }
+
+ @Internal
+ TypeSerializerSchemaCompatibility<T> internalResolveSchemaCompatibility(
+ TypeSerializer<T> newSerializer,
+ TypeSerializerSnapshot<?>[] snapshots) {
if (newSerializer.getClass() != correspondingSerializerClass) {
return TypeSerializerSchemaCompatibility.incompatible();
}
@@ -158,10 +166,9 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
return TypeSerializerSchemaCompatibility.incompatible();
}
- // since outer configuration is compatible, the final compatibility result depends only on the nested serializers
return constructFinalSchemaCompatibilityResult(
getNestedSerializers(castedNewSerializer),
- nestedSerializersSnapshotDelegate.getNestedSerializerSnapshots());
+ snapshots);
}
@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java
new file mode 100644
index 0000000..3c162e3
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utilities for the {@link CompositeTypeSerializerSnapshot}.
+ */
+@Internal
+public class CompositeTypeSerializerUtil {
+
+ /**
+ * Delegates compatibility checks to a {@link CompositeTypeSerializerSnapshot} instance.
+ * This can be used by legacy snapshot classes, which have a newer implementation
+ * implemented as a {@link CompositeTypeSerializerSnapshot}.
+ *
+ * @param newSerializer the new serializer to check for compatibility.
+ * @param newCompositeSnapshot an instance of the new snapshot class to delegate compatibility checks to.
+ * This instance should already contain the outer snapshot information.
+ * @param legacyNestedSnapshots the nested serializer snapshots of the legacy composite snapshot.
+ *
+ * @return the result compatibility.
+ */
+ public static <T> TypeSerializerSchemaCompatibility<T> delegateCompatibilityCheckToNewSnapshot(
+ TypeSerializer<T> newSerializer,
+ CompositeTypeSerializerSnapshot<T, ? extends TypeSerializer> newCompositeSnapshot,
+ TypeSerializerSnapshot<?>... legacyNestedSnapshots) {
+
+ checkArgument(legacyNestedSnapshots.length > 0);
+ return newCompositeSnapshot.internalResolveSchemaCompatibility(newSerializer, legacyNestedSnapshots);
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
index 762a441..b485ae8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
@@ -20,6 +20,8 @@ package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -47,18 +49,16 @@ public final class CollectionSerializerConfigSnapshot<C extends Collection<T>, T
}
@Override
+ @SuppressWarnings("unchecked")
public TypeSerializerSchemaCompatibility<C> resolveSchemaCompatibility(TypeSerializer<C> newSerializer) {
- if (newSerializer instanceof ListSerializer) {
- ListSerializer<T> newListSerializer = (ListSerializer<T>) newSerializer;
- ListSerializerSnapshot<T> listSerializerSnapshot = new ListSerializerSnapshot<>(newListSerializer);
-
- @SuppressWarnings("unchecked")
- TypeSerializerSchemaCompatibility<C> result = (TypeSerializerSchemaCompatibility<C>)
- listSerializerSnapshot.resolveSchemaCompatibility(newListSerializer);
- return result;
- } else {
+ if (!(newSerializer instanceof ListSerializer)) {
return super.resolveSchemaCompatibility(newSerializer);
}
+
+ return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+ newSerializer,
+ (CompositeTypeSerializerSnapshot<C, ? extends TypeSerializer>) new ListSerializerSnapshot<>(),
+ getSingleNestedSerializerAndConfig().f1);
}
@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index cfc2e98..380911e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -115,20 +116,23 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
}
@Override
- public TypeSerializer<C[]> restoreSerializer() {
+ public GenericArraySerializer<C> restoreSerializer() {
checkState(componentClass != null && nestedSnapshot != null);
return new GenericArraySerializer<>(componentClass, nestedSnapshot.getRestoredNestedSerializer(0));
}
@Override
public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(TypeSerializer<C[]> newSerializer) {
- if (newSerializer instanceof GenericArraySerializer) {
- // delegate to the new snapshot class
- GenericArraySerializer<C> castedNewSerializer = (GenericArraySerializer<C>) newSerializer;
- GenericArraySerializerSnapshot<C> newSnapshot = new GenericArraySerializerSnapshot<>(castedNewSerializer);
- return newSnapshot.resolveSchemaCompatibility(castedNewSerializer);
- } else {
+ checkState(nestedSnapshot != null);
+
+ if (!(newSerializer instanceof GenericArraySerializer)) {
return TypeSerializerSchemaCompatibility.incompatible();
}
+
+ // delegate to the new snapshot class
+ return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+ newSerializer,
+ new GenericArraySerializerSnapshot<>(componentClass),
+ nestedSnapshot.getNestedSerializerSnapshots());
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
index 3f54dee..d39b58e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -52,6 +52,15 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial
this.componentClass = genericArraySerializer.getComponentClass();
}
+ /**
+ * Constructor that the legacy {@link GenericArraySerializerConfigSnapshot} uses
+ * to delegate compatibility checks to this class.
+ */
+ GenericArraySerializerSnapshot(Class<C> componentClass) {
+ super(GenericArraySerializer.class);
+ this.componentClass = componentClass;
+ }
+
@Override
protected int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
index 2b78b52..8c7e128 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
@@ -20,9 +20,13 @@ package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
+import java.util.List;
import java.util.Map;
/**
@@ -47,11 +51,14 @@ public final class MapSerializerConfigSnapshot<K, V> extends CompositeTypeSerial
@Override
public TypeSerializerSchemaCompatibility<Map<K, V>> resolveSchemaCompatibility(TypeSerializer<Map<K, V>> newSerializer) {
if (newSerializer instanceof MapSerializer) {
- // redirect the compatibility check to the new MapSerializerConfigSnapshot
- MapSerializer<K, V> mapSerializer = (MapSerializer<K, V>) newSerializer;
+ List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs();
- MapSerializerSnapshot<K, V> mapSerializerSnapshot = new MapSerializerSnapshot<>(mapSerializer);
- return mapSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
+ // redirect the compatibility check to the new MapSerializerSnapshot
+ return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+ newSerializer,
+ new MapSerializerSnapshot<>(),
+ nestedSerializersAndConfigs.get(0).f1,
+ nestedSerializersAndConfigs.get(1).f1);
}
else {
return super.resolveSchemaCompatibility(newSerializer);
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
index d186953..a4863b0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
@@ -20,11 +20,15 @@ package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Either;
+import java.util.List;
+
/**
* Deprecated config snapshot retained for savepoint compatibility with Flink 1.6 and earlier.
*/
@@ -51,7 +55,12 @@ public final class EitherSerializerConfigSnapshot<L, R> extends CompositeTypeSer
// this class was shared between the Java Either Serializer and the
// Scala Either serializer
if (newSerializer.getClass() == EitherSerializer.class) {
- return checkJavaSerializerCompatibility((EitherSerializer<L, R>) newSerializer);
+ List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs();
+ return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+ newSerializer,
+ new JavaEitherSerializerSnapshot<>(),
+ nestedSerializersAndConfigs.get(0).f1,
+ nestedSerializersAndConfigs.get(1).f1);
}
else {
// Scala Either Serializer, or other.
@@ -59,26 +68,4 @@ public final class EitherSerializerConfigSnapshot<L, R> extends CompositeTypeSer
return super.resolveSchemaCompatibility(newSerializer);
}
}
-
- @SuppressWarnings("unchecked")
- private TypeSerializerSchemaCompatibility<Either<L, R>> checkJavaSerializerCompatibility(
- EitherSerializer<L, R> serializer) {
-
- TypeSerializer<L> leftSerializer = serializer.getLeftSerializer();
- TypeSerializer<R> rightSerializer = serializer.getRightSerializer();
-
- TypeSerializerSnapshot<L> leftSnapshot = (TypeSerializerSnapshot<L>) getNestedSerializersAndConfigs().get(0).f1;
- TypeSerializerSnapshot<R> rightSnapshot = (TypeSerializerSnapshot<R>) getNestedSerializersAndConfigs().get(1).f1;
-
- TypeSerializerSchemaCompatibility<?> leftCompatibility = leftSnapshot.resolveSchemaCompatibility(leftSerializer);
- TypeSerializerSchemaCompatibility<?> rightCompatibility = rightSnapshot.resolveSchemaCompatibility(rightSerializer);
-
- if (leftCompatibility.isCompatibleAsIs() && rightCompatibility.isCompatibleAsIs()) {
- return TypeSerializerSchemaCompatibility.compatibleAsIs();
- }
- if (leftCompatibility.isCompatibleAfterMigration() && rightCompatibility.isCompatibleAfterMigration()) {
- return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
- }
- return TypeSerializerSchemaCompatibility.incompatible();
- }
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
index 1779ec8..1c84366 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -101,7 +102,7 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
}
@Override
- public TypeSerializer<Either<L, R>> restoreSerializer() {
+ public EitherSerializer<L, R> restoreSerializer() {
checkState(nestedSnapshot != null);
return new EitherSerializer<>(
nestedSnapshot.getRestoredNestedSerializer(0),
@@ -115,9 +116,10 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
if (newSerializer instanceof EitherSerializer) {
// delegate compatibility check to the new snapshot class
- EitherSerializer<L, R> serializer = (EitherSerializer<L, R>) newSerializer;
- JavaEitherSerializerSnapshot<L, R> newSnapshot = new JavaEitherSerializerSnapshot<>(serializer);
- return newSnapshot.resolveSchemaCompatibility(serializer);
+ return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+ newSerializer,
+ new JavaEitherSerializerSnapshot<>(),
+ nestedSnapshot.getNestedSerializerSnapshots());
}
else {
return TypeSerializerSchemaCompatibility.incompatible();