You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/18 08:48:56 UTC

[flink] branch master updated (f012270 -> 8e7b80f)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f012270  [FLINK-11360] [test] Check and remove LocalFlinkMiniClusterITCase
     new c086cf0  [hotfix] [tests] Fix incorrect EitherSerializerSnapshot migration test files
     new 8e7b80f  [FLINK-11372] [core] Incorrect delegation of compatibility checks to new CompositeTypeSerializerSnapshots

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../typeutils/CompositeTypeSerializerSnapshot.java |  11 ++++-
 .../typeutils/CompositeTypeSerializerUtil.java     |  51 +++++++++++++++++++++
 .../base/CollectionSerializerConfigSnapshot.java   |  18 ++++----
 .../base/GenericArraySerializerConfigSnapshot.java |  18 +++++---
 .../base/GenericArraySerializerSnapshot.java       |   9 ++++
 .../base/MapSerializerConfigSnapshot.java          |  15 ++++--
 .../runtime/EitherSerializerConfigSnapshot.java    |  33 ++++---------
 .../runtime/EitherSerializerSnapshot.java          |  10 ++--
 .../resources/flink-1.7-either-serializer-data     | Bin 130 -> 130 bytes
 .../resources/flink-1.7-either-serializer-snapshot | Bin 383 -> 383 bytes
 10 files changed, 116 insertions(+), 49 deletions(-)
 create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java


[flink] 02/02: [FLINK-11372] [core] Incorrect delegation of compatibility checks to new CompositeTypeSerializerSnapshots

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e7b80f755efc257d7a2803fc604e2746b6530a3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jan 16 18:06:16 2019 +0100

    [FLINK-11372] [core] Incorrect delegation of compatibility checks to new CompositeTypeSerializerSnapshots
    
    This closes #7521.
---
 .../typeutils/CompositeTypeSerializerSnapshot.java | 11 ++++-
 .../typeutils/CompositeTypeSerializerUtil.java     | 51 ++++++++++++++++++++++
 .../base/CollectionSerializerConfigSnapshot.java   | 18 ++++----
 .../base/GenericArraySerializerConfigSnapshot.java | 18 +++++---
 .../base/GenericArraySerializerSnapshot.java       |  9 ++++
 .../base/MapSerializerConfigSnapshot.java          | 15 +++++--
 .../runtime/EitherSerializerConfigSnapshot.java    | 33 +++++---------
 .../runtime/EitherSerializerSnapshot.java          | 10 +++--
 8 files changed, 116 insertions(+), 49 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
index c73e24c..49ff8a0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
@@ -147,6 +148,13 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 
 	@Override
 	public final TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
+		return internalResolveSchemaCompatibility(newSerializer, nestedSerializersSnapshotDelegate.getNestedSerializerSnapshots());
+	}
+
+	@Internal
+	TypeSerializerSchemaCompatibility<T> internalResolveSchemaCompatibility(
+			TypeSerializer<T> newSerializer,
+			TypeSerializerSnapshot<?>[] snapshots) {
 		if (newSerializer.getClass() != correspondingSerializerClass) {
 			return TypeSerializerSchemaCompatibility.incompatible();
 		}
@@ -158,10 +166,9 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 			return TypeSerializerSchemaCompatibility.incompatible();
 		}
 
-		// since outer configuration is compatible, the final compatibility result depends only on the nested serializers
 		return constructFinalSchemaCompatibilityResult(
 			getNestedSerializers(castedNewSerializer),
-			nestedSerializersSnapshotDelegate.getNestedSerializerSnapshots());
+			snapshots);
 	}
 
 	@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java
new file mode 100644
index 0000000..3c162e3
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utilities for the {@link CompositeTypeSerializerSnapshot}.
+ */
+@Internal
+public class CompositeTypeSerializerUtil {
+
+	/**
+	 * Delegates compatibility checks to a {@link CompositeTypeSerializerSnapshot} instance.
+	 * This can be used by legacy snapshot classes, which have a newer implementation
+	 * implemented as a {@link CompositeTypeSerializerSnapshot}.
+	 *
+	 * @param newSerializer the new serializer to check for compatibility.
+	 * @param newCompositeSnapshot an instance of the new snapshot class to delegate compatibility checks to.
+	 *                             This instance should already contain the outer snapshot information.
+	 * @param legacyNestedSnapshots the nested serializer snapshots of the legacy composite snapshot.
+	 *
+	 * @return the result compatibility.
+	 */
+	public static <T> TypeSerializerSchemaCompatibility<T> delegateCompatibilityCheckToNewSnapshot(
+			TypeSerializer<T> newSerializer,
+			CompositeTypeSerializerSnapshot<T, ? extends TypeSerializer> newCompositeSnapshot,
+			TypeSerializerSnapshot<?>... legacyNestedSnapshots) {
+
+		checkArgument(legacyNestedSnapshots.length > 0);
+		return newCompositeSnapshot.internalResolveSchemaCompatibility(newSerializer, legacyNestedSnapshots);
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
index 762a441..b485ae8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
@@ -20,6 +20,8 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 
@@ -47,18 +49,16 @@ public final class CollectionSerializerConfigSnapshot<C extends Collection<T>, T
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	public TypeSerializerSchemaCompatibility<C> resolveSchemaCompatibility(TypeSerializer<C> newSerializer) {
-		if (newSerializer instanceof ListSerializer) {
-			ListSerializer<T> newListSerializer = (ListSerializer<T>) newSerializer;
-			ListSerializerSnapshot<T> listSerializerSnapshot = new ListSerializerSnapshot<>(newListSerializer);
-
-			@SuppressWarnings("unchecked")
-			TypeSerializerSchemaCompatibility<C> result = (TypeSerializerSchemaCompatibility<C>)
-				listSerializerSnapshot.resolveSchemaCompatibility(newListSerializer);
-			return result;
-		} else {
+		if (!(newSerializer instanceof ListSerializer)) {
 			return super.resolveSchemaCompatibility(newSerializer);
 		}
+
+		return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+			newSerializer,
+			(CompositeTypeSerializerSnapshot<C, ? extends TypeSerializer>) new ListSerializerSnapshot<>(),
+			getSingleNestedSerializerAndConfig().f1);
 	}
 
 	@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index cfc2e98..380911e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -115,20 +116,23 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 	}
 
 	@Override
-	public TypeSerializer<C[]> restoreSerializer() {
+	public GenericArraySerializer<C> restoreSerializer() {
 		checkState(componentClass != null && nestedSnapshot != null);
 		return new GenericArraySerializer<>(componentClass, nestedSnapshot.getRestoredNestedSerializer(0));
 	}
 
 	@Override
 	public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(TypeSerializer<C[]> newSerializer) {
-		if (newSerializer instanceof GenericArraySerializer) {
-			// delegate to the new snapshot class
-			GenericArraySerializer<C> castedNewSerializer = (GenericArraySerializer<C>) newSerializer;
-			GenericArraySerializerSnapshot<C> newSnapshot = new GenericArraySerializerSnapshot<>(castedNewSerializer);
-			return newSnapshot.resolveSchemaCompatibility(castedNewSerializer);
-		} else {
+		checkState(nestedSnapshot != null);
+
+		if (!(newSerializer instanceof GenericArraySerializer)) {
 			return TypeSerializerSchemaCompatibility.incompatible();
 		}
+
+		// delegate to the new snapshot class
+		return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+			newSerializer,
+			new GenericArraySerializerSnapshot<>(componentClass),
+			nestedSnapshot.getNestedSerializerSnapshots());
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
index 3f54dee..d39b58e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -52,6 +52,15 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial
 		this.componentClass = genericArraySerializer.getComponentClass();
 	}
 
+	/**
+	 * Constructor that the legacy {@link GenericArraySerializerConfigSnapshot} uses
+	 * to delegate compatibility checks to this class.
+	 */
+	GenericArraySerializerSnapshot(Class<C> componentClass) {
+		super(GenericArraySerializer.class);
+		this.componentClass = componentClass;
+	}
+
 	@Override
 	protected int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
index 2b78b52..8c7e128 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
@@ -20,9 +20,13 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -47,11 +51,14 @@ public final class MapSerializerConfigSnapshot<K, V> extends CompositeTypeSerial
 	@Override
 	public TypeSerializerSchemaCompatibility<Map<K, V>> resolveSchemaCompatibility(TypeSerializer<Map<K, V>> newSerializer) {
 		if (newSerializer instanceof MapSerializer) {
-			// redirect the compatibility check to the new MapSerializerConfigSnapshot
-			MapSerializer<K, V> mapSerializer = (MapSerializer<K, V>) newSerializer;
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs();
 
-			MapSerializerSnapshot<K, V> mapSerializerSnapshot = new MapSerializerSnapshot<>(mapSerializer);
-			return mapSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
+			// redirect the compatibility check to the new MapSerializerSnapshot
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new MapSerializerSnapshot<>(),
+				nestedSerializersAndConfigs.get(0).f1,
+				nestedSerializersAndConfigs.get(1).f1);
 		}
 		else {
 			return super.resolveSchemaCompatibility(newSerializer);
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
index d186953..a4863b0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
@@ -20,11 +20,15 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.Either;
 
+import java.util.List;
+
 /**
  * Deprecated config snapshot retained for savepoint compatibility with Flink 1.6 and earlier.
  */
@@ -51,7 +55,12 @@ public final class EitherSerializerConfigSnapshot<L, R> extends CompositeTypeSer
 		// this class was shared between the Java Either Serializer and the
 		// Scala Either serializer
 		if (newSerializer.getClass() == EitherSerializer.class) {
-			return checkJavaSerializerCompatibility((EitherSerializer<L, R>) newSerializer);
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs();
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new JavaEitherSerializerSnapshot<>(),
+				nestedSerializersAndConfigs.get(0).f1,
+				nestedSerializersAndConfigs.get(1).f1);
 		}
 		else {
 			// Scala Either Serializer, or other.
@@ -59,26 +68,4 @@ public final class EitherSerializerConfigSnapshot<L, R> extends CompositeTypeSer
 			return super.resolveSchemaCompatibility(newSerializer);
 		}
 	}
-
-	@SuppressWarnings("unchecked")
-	private TypeSerializerSchemaCompatibility<Either<L, R>> checkJavaSerializerCompatibility(
-			EitherSerializer<L, R> serializer) {
-
-		TypeSerializer<L> leftSerializer = serializer.getLeftSerializer();
-		TypeSerializer<R> rightSerializer = serializer.getRightSerializer();
-
-		TypeSerializerSnapshot<L> leftSnapshot = (TypeSerializerSnapshot<L>) getNestedSerializersAndConfigs().get(0).f1;
-		TypeSerializerSnapshot<R> rightSnapshot = (TypeSerializerSnapshot<R>) getNestedSerializersAndConfigs().get(1).f1;
-
-		TypeSerializerSchemaCompatibility<?> leftCompatibility = leftSnapshot.resolveSchemaCompatibility(leftSerializer);
-		TypeSerializerSchemaCompatibility<?> rightCompatibility = rightSnapshot.resolveSchemaCompatibility(rightSerializer);
-
-		if (leftCompatibility.isCompatibleAsIs() && rightCompatibility.isCompatibleAsIs()) {
-			return TypeSerializerSchemaCompatibility.compatibleAsIs();
-		}
-		if (leftCompatibility.isCompatibleAfterMigration() && rightCompatibility.isCompatibleAfterMigration()) {
-			return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
-		}
-		return TypeSerializerSchemaCompatibility.incompatible();
-	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
index 1779ec8..1c84366 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -101,7 +102,7 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
 	}
 
 	@Override
-	public TypeSerializer<Either<L, R>> restoreSerializer() {
+	public EitherSerializer<L, R> restoreSerializer() {
 		checkState(nestedSnapshot != null);
 		return new EitherSerializer<>(
 				nestedSnapshot.getRestoredNestedSerializer(0),
@@ -115,9 +116,10 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
 
 		if (newSerializer instanceof EitherSerializer) {
 			// delegate compatibility check to the new snapshot class
-			EitherSerializer<L, R> serializer = (EitherSerializer<L, R>) newSerializer;
-			JavaEitherSerializerSnapshot<L, R> newSnapshot = new JavaEitherSerializerSnapshot<>(serializer);
-			return newSnapshot.resolveSchemaCompatibility(serializer);
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new JavaEitherSerializerSnapshot<>(),
+				nestedSnapshot.getNestedSerializerSnapshots());
 		}
 		else {
 			return TypeSerializerSchemaCompatibility.incompatible();


[flink] 01/02: [hotfix] [tests] Fix incorrect EitherSerializerSnapshot migration test files

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c086cf05af6ff4560329fd36ae46c14f4844d480
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Jan 17 15:34:55 2019 +0100

    [hotfix] [tests] Fix incorrect EitherSerializerSnapshot migration test files
---
 .../src/test/resources/flink-1.7-either-serializer-data | Bin 130 -> 130 bytes
 .../test/resources/flink-1.7-either-serializer-snapshot | Bin 383 -> 383 bytes
 2 files changed, 0 insertions(+), 0 deletions(-)

diff --git a/flink-core/src/test/resources/flink-1.7-either-serializer-data b/flink-core/src/test/resources/flink-1.7-either-serializer-data
index 203067c..3aabc0b 100644
Binary files a/flink-core/src/test/resources/flink-1.7-either-serializer-data and b/flink-core/src/test/resources/flink-1.7-either-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-either-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-either-serializer-snapshot
index 7f2cf0f..16242c0 100644
Binary files a/flink-core/src/test/resources/flink-1.7-either-serializer-snapshot and b/flink-core/src/test/resources/flink-1.7-either-serializer-snapshot differ