You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/18 08:48:58 UTC

[flink] 02/02: [FLINK-11372] [core] Incorrect delegation of compatibility checks to new CompositeTypeSerializerSnapshots

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e7b80f755efc257d7a2803fc604e2746b6530a3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jan 16 18:06:16 2019 +0100

    [FLINK-11372] [core] Incorrect delegation of compatibility checks to new CompositeTypeSerializerSnapshots
    
    This closes #7521.
---
 .../typeutils/CompositeTypeSerializerSnapshot.java | 11 ++++-
 .../typeutils/CompositeTypeSerializerUtil.java     | 51 ++++++++++++++++++++++
 .../base/CollectionSerializerConfigSnapshot.java   | 18 ++++----
 .../base/GenericArraySerializerConfigSnapshot.java | 18 +++++---
 .../base/GenericArraySerializerSnapshot.java       |  9 ++++
 .../base/MapSerializerConfigSnapshot.java          | 15 +++++--
 .../runtime/EitherSerializerConfigSnapshot.java    | 33 +++++---------
 .../runtime/EitherSerializerSnapshot.java          | 10 +++--
 8 files changed, 116 insertions(+), 49 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
index c73e24c..49ff8a0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
@@ -147,6 +148,13 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 
 	@Override
 	public final TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
+		return internalResolveSchemaCompatibility(newSerializer, nestedSerializersSnapshotDelegate.getNestedSerializerSnapshots());
+	}
+
+	@Internal
+	TypeSerializerSchemaCompatibility<T> internalResolveSchemaCompatibility(
+			TypeSerializer<T> newSerializer,
+			TypeSerializerSnapshot<?>[] snapshots) {
 		if (newSerializer.getClass() != correspondingSerializerClass) {
 			return TypeSerializerSchemaCompatibility.incompatible();
 		}
@@ -158,10 +166,9 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 			return TypeSerializerSchemaCompatibility.incompatible();
 		}
 
-		// since outer configuration is compatible, the final compatibility result depends only on the nested serializers
 		return constructFinalSchemaCompatibilityResult(
 			getNestedSerializers(castedNewSerializer),
-			nestedSerializersSnapshotDelegate.getNestedSerializerSnapshots());
+			snapshots);
 	}
 
 	@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java
new file mode 100644
index 0000000..3c162e3
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utilities for the {@link CompositeTypeSerializerSnapshot}.
+ */
+@Internal
+public class CompositeTypeSerializerUtil {
+
+	/**
+	 * Delegates compatibility checks to a {@link CompositeTypeSerializerSnapshot} instance.
+	 * This can be used by legacy snapshot classes, which have a newer implementation
+	 * implemented as a {@link CompositeTypeSerializerSnapshot}.
+	 *
+	 * @param newSerializer the new serializer to check for compatibility.
+	 * @param newCompositeSnapshot an instance of the new snapshot class to delegate compatibility checks to.
+	 *                             This instance should already contain the outer snapshot information.
+	 * @param legacyNestedSnapshots the nested serializer snapshots of the legacy composite snapshot.
+	 *
+	 * @return the result compatibility.
+	 */
+	public static <T> TypeSerializerSchemaCompatibility<T> delegateCompatibilityCheckToNewSnapshot(
+			TypeSerializer<T> newSerializer,
+			CompositeTypeSerializerSnapshot<T, ? extends TypeSerializer> newCompositeSnapshot,
+			TypeSerializerSnapshot<?>... legacyNestedSnapshots) {
+
+		checkArgument(legacyNestedSnapshots.length > 0);
+		return newCompositeSnapshot.internalResolveSchemaCompatibility(newSerializer, legacyNestedSnapshots);
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
index 762a441..b485ae8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
@@ -20,6 +20,8 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 
@@ -47,18 +49,16 @@ public final class CollectionSerializerConfigSnapshot<C extends Collection<T>, T
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	public TypeSerializerSchemaCompatibility<C> resolveSchemaCompatibility(TypeSerializer<C> newSerializer) {
-		if (newSerializer instanceof ListSerializer) {
-			ListSerializer<T> newListSerializer = (ListSerializer<T>) newSerializer;
-			ListSerializerSnapshot<T> listSerializerSnapshot = new ListSerializerSnapshot<>(newListSerializer);
-
-			@SuppressWarnings("unchecked")
-			TypeSerializerSchemaCompatibility<C> result = (TypeSerializerSchemaCompatibility<C>)
-				listSerializerSnapshot.resolveSchemaCompatibility(newListSerializer);
-			return result;
-		} else {
+		if (!(newSerializer instanceof ListSerializer)) {
 			return super.resolveSchemaCompatibility(newSerializer);
 		}
+
+		return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+			newSerializer,
+			(CompositeTypeSerializerSnapshot<C, ? extends TypeSerializer>) new ListSerializerSnapshot<>(),
+			getSingleNestedSerializerAndConfig().f1);
 	}
 
 	@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index cfc2e98..380911e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -115,20 +116,23 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 	}
 
 	@Override
-	public TypeSerializer<C[]> restoreSerializer() {
+	public GenericArraySerializer<C> restoreSerializer() {
 		checkState(componentClass != null && nestedSnapshot != null);
 		return new GenericArraySerializer<>(componentClass, nestedSnapshot.getRestoredNestedSerializer(0));
 	}
 
 	@Override
 	public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(TypeSerializer<C[]> newSerializer) {
-		if (newSerializer instanceof GenericArraySerializer) {
-			// delegate to the new snapshot class
-			GenericArraySerializer<C> castedNewSerializer = (GenericArraySerializer<C>) newSerializer;
-			GenericArraySerializerSnapshot<C> newSnapshot = new GenericArraySerializerSnapshot<>(castedNewSerializer);
-			return newSnapshot.resolveSchemaCompatibility(castedNewSerializer);
-		} else {
+		checkState(nestedSnapshot != null);
+
+		if (!(newSerializer instanceof GenericArraySerializer)) {
 			return TypeSerializerSchemaCompatibility.incompatible();
 		}
+
+		// delegate to the new snapshot class
+		return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+			newSerializer,
+			new GenericArraySerializerSnapshot<>(componentClass),
+			nestedSnapshot.getNestedSerializerSnapshots());
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
index 3f54dee..d39b58e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -52,6 +52,15 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial
 		this.componentClass = genericArraySerializer.getComponentClass();
 	}
 
+	/**
+	 * Constructor that the legacy {@link GenericArraySerializerConfigSnapshot} uses
+	 * to delegate compatibility checks to this class.
+	 */
+	GenericArraySerializerSnapshot(Class<C> componentClass) {
+		super(GenericArraySerializer.class);
+		this.componentClass = componentClass;
+	}
+
 	@Override
 	protected int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
index 2b78b52..8c7e128 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
@@ -20,9 +20,13 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -47,11 +51,14 @@ public final class MapSerializerConfigSnapshot<K, V> extends CompositeTypeSerial
 	@Override
 	public TypeSerializerSchemaCompatibility<Map<K, V>> resolveSchemaCompatibility(TypeSerializer<Map<K, V>> newSerializer) {
 		if (newSerializer instanceof MapSerializer) {
-			// redirect the compatibility check to the new MapSerializerConfigSnapshot
-			MapSerializer<K, V> mapSerializer = (MapSerializer<K, V>) newSerializer;
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs();
 
-			MapSerializerSnapshot<K, V> mapSerializerSnapshot = new MapSerializerSnapshot<>(mapSerializer);
-			return mapSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
+			// redirect the compatibility check to the new MapSerializerSnapshot
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new MapSerializerSnapshot<>(),
+				nestedSerializersAndConfigs.get(0).f1,
+				nestedSerializersAndConfigs.get(1).f1);
 		}
 		else {
 			return super.resolveSchemaCompatibility(newSerializer);
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
index d186953..a4863b0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
@@ -20,11 +20,15 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.Either;
 
+import java.util.List;
+
 /**
  * Deprecated config snapshot retained for savepoint compatibility with Flink 1.6 and earlier.
  */
@@ -51,7 +55,12 @@ public final class EitherSerializerConfigSnapshot<L, R> extends CompositeTypeSer
 		// this class was shared between the Java Either Serializer and the
 		// Scala Either serializer
 		if (newSerializer.getClass() == EitherSerializer.class) {
-			return checkJavaSerializerCompatibility((EitherSerializer<L, R>) newSerializer);
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs();
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new JavaEitherSerializerSnapshot<>(),
+				nestedSerializersAndConfigs.get(0).f1,
+				nestedSerializersAndConfigs.get(1).f1);
 		}
 		else {
 			// Scala Either Serializer, or other.
@@ -59,26 +68,4 @@ public final class EitherSerializerConfigSnapshot<L, R> extends CompositeTypeSer
 			return super.resolveSchemaCompatibility(newSerializer);
 		}
 	}
-
-	@SuppressWarnings("unchecked")
-	private TypeSerializerSchemaCompatibility<Either<L, R>> checkJavaSerializerCompatibility(
-			EitherSerializer<L, R> serializer) {
-
-		TypeSerializer<L> leftSerializer = serializer.getLeftSerializer();
-		TypeSerializer<R> rightSerializer = serializer.getRightSerializer();
-
-		TypeSerializerSnapshot<L> leftSnapshot = (TypeSerializerSnapshot<L>) getNestedSerializersAndConfigs().get(0).f1;
-		TypeSerializerSnapshot<R> rightSnapshot = (TypeSerializerSnapshot<R>) getNestedSerializersAndConfigs().get(1).f1;
-
-		TypeSerializerSchemaCompatibility<?> leftCompatibility = leftSnapshot.resolveSchemaCompatibility(leftSerializer);
-		TypeSerializerSchemaCompatibility<?> rightCompatibility = rightSnapshot.resolveSchemaCompatibility(rightSerializer);
-
-		if (leftCompatibility.isCompatibleAsIs() && rightCompatibility.isCompatibleAsIs()) {
-			return TypeSerializerSchemaCompatibility.compatibleAsIs();
-		}
-		if (leftCompatibility.isCompatibleAfterMigration() && rightCompatibility.isCompatibleAfterMigration()) {
-			return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
-		}
-		return TypeSerializerSchemaCompatibility.incompatible();
-	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
index 1779ec8..1c84366 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -101,7 +102,7 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
 	}
 
 	@Override
-	public TypeSerializer<Either<L, R>> restoreSerializer() {
+	public EitherSerializer<L, R> restoreSerializer() {
 		checkState(nestedSnapshot != null);
 		return new EitherSerializer<>(
 				nestedSnapshot.getRestoredNestedSerializer(0),
@@ -115,9 +116,10 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
 
 		if (newSerializer instanceof EitherSerializer) {
 			// delegate compatibility check to the new snapshot class
-			EitherSerializer<L, R> serializer = (EitherSerializer<L, R>) newSerializer;
-			JavaEitherSerializerSnapshot<L, R> newSnapshot = new JavaEitherSerializerSnapshot<>(serializer);
-			return newSnapshot.resolveSchemaCompatibility(serializer);
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new JavaEitherSerializerSnapshot<>(),
+				nestedSnapshot.getNestedSerializerSnapshots());
 		}
 		else {
 			return TypeSerializerSchemaCompatibility.incompatible();