You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/19 11:48:44 UTC

[flink] branch release-1.11 updated (6178496 -> 91a4c2a)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 6178496  [FLINK-17728] [sql-client] sql client supports parser statements via sql parser
     new 637d1de  [FLINK-17520] [core] Extend CompositeTypeSerializerSnapshot to allow migration based on outer snapshot
     new 827f233  [FLINK-17520] [test] Simplify CompositeTypeSerializerSnapshotTest
     new 982a503  [FLINK-17520] [test] Add test case for outer snapshot requiring migration
     new 01144bc  [FLINK-17520] [core] Rework all implementations of CompositeTypeSerializerSnapshot#resolveOuterSchemaCompatibility
     new 91a4c2a  [FLINK-17520] [doc] Use new resolveOuterSchemaCompatibility in custom serialization docs

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/stream/state/custom_serialization.md      |  8 +-
 docs/dev/stream/state/custom_serialization.zh.md   | 24 +++---
 .../typeutils/CompositeTypeSerializerSnapshot.java | 78 ++++++++++++++++----
 .../base/GenericArraySerializerSnapshot.java       |  6 +-
 .../java/typeutils/runtime/NullableSerializer.java |  6 +-
 .../CompositeTypeSerializerSnapshotTest.java       | 85 ++++++++++++----------
 .../ScalaCaseClassSerializerSnapshot.java          |  6 +-
 .../typeutils/TraversableSerializerSnapshot.java   |  6 +-
 .../Tuple2CaseClassSerializerSnapshot.java         |  6 +-
 9 files changed, 148 insertions(+), 77 deletions(-)


[flink] 02/05: [FLINK-17520] [test] Simplify CompositeTypeSerializerSnapshotTest

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 827f2337f18c86540c0d2d7c25ecf5eb566a5b4f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon May 18 13:00:56 2020 +0800

    [FLINK-17520] [test] Simplify CompositeTypeSerializerSnapshotTest
    
    The unit tests in CompositeTypeSerializerSnapshotTest were previously
    over-engineered w.r.t. how to mock the result of a compatibility check.
    It was doing string equals checks, whereas it is simple enough to just
    let the new serializer wrap a mock compat result and just return that
    for the compatibility checks.
---
 .../CompositeTypeSerializerSnapshotTest.java       | 66 ++++++++++------------
 1 file changed, 29 insertions(+), 37 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
index 1ffbfc2..f0fcbd7 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
@@ -30,6 +30,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Arrays;
 
+import static org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.OuterSchemaCompatibility;
+
 /**
  * Test suite for the {@link CompositeTypeSerializerSnapshot}.
  */
@@ -41,7 +43,6 @@ public class CompositeTypeSerializerSnapshotTest {
 
 	@Test
 	public void testIncompatiblePrecedence() throws IOException {
-		final String OUTER_CONFIG = "outer-config";
 		final TypeSerializer<?>[] testNestedSerializers = {
 			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
 			new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION),
@@ -53,15 +54,13 @@ public class CompositeTypeSerializerSnapshotTest {
 			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
 				testNestedSerializers,
 				testNestedSerializers,
-				OUTER_CONFIG,
-				OUTER_CONFIG);
+				OuterSchemaCompatibility.COMPATIBLE_AS_IS);
 
 		Assert.assertTrue(compatibility.isIncompatible());
 	}
 
 	@Test
 	public void testCompatibleAfterMigrationPrecedence() throws IOException {
-		final String OUTER_CONFIG = "outer-config";
 		TypeSerializer<?>[] testNestedSerializers = {
 			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
 			new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION),
@@ -73,15 +72,13 @@ public class CompositeTypeSerializerSnapshotTest {
 			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
 				testNestedSerializers,
 				testNestedSerializers,
-				OUTER_CONFIG,
-				OUTER_CONFIG);
+				OuterSchemaCompatibility.COMPATIBLE_AS_IS);
 
 		Assert.assertTrue(compatibility.isCompatibleAfterMigration());
 	}
 
 	@Test
 	public void testCompatibleWithReconfiguredSerializerPrecedence() throws IOException {
-		final String OUTER_CONFIG = "outer-config";
 		TypeSerializer<?>[] testNestedSerializers = {
 			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
 			new NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER),
@@ -92,8 +89,7 @@ public class CompositeTypeSerializerSnapshotTest {
 			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
 				testNestedSerializers,
 				testNestedSerializers,
-				OUTER_CONFIG,
-				OUTER_CONFIG);
+				OuterSchemaCompatibility.COMPATIBLE_AS_IS);
 
 		Assert.assertTrue(compatibility.isCompatibleWithReconfiguredSerializer());
 
@@ -108,7 +104,6 @@ public class CompositeTypeSerializerSnapshotTest {
 
 	@Test
 	public void testCompatibleAsIsPrecedence() throws IOException {
-		final String OUTER_CONFIG = "outer-config";
 		TypeSerializer<?>[] testNestedSerializers = {
 			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
 			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
@@ -118,16 +113,13 @@ public class CompositeTypeSerializerSnapshotTest {
 			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
 				testNestedSerializers,
 				testNestedSerializers,
-				OUTER_CONFIG,
-				OUTER_CONFIG);
+				OuterSchemaCompatibility.COMPATIBLE_AS_IS);
 
 		Assert.assertTrue(compatibility.isCompatibleAsIs());
 	}
 
 	@Test
 	public void testOuterSnapshotCompatibilityPrecedence() throws IOException {
-		final String INIT_OUTER_CONFIG = "outer-config";
-		final String INCOMPAT_OUTER_CONFIG = "incompat-outer-config";
 		TypeSerializer<?>[] testNestedSerializers = {
 			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
 		};
@@ -136,8 +128,7 @@ public class CompositeTypeSerializerSnapshotTest {
 			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
 				testNestedSerializers,
 				testNestedSerializers,
-				INIT_OUTER_CONFIG,
-				INCOMPAT_OUTER_CONFIG);
+				OuterSchemaCompatibility.INCOMPATIBLE);
 
 		// even though nested serializers are compatible, incompatibility of the outer
 		// snapshot should have higher precedence in the final result
@@ -146,8 +137,6 @@ public class CompositeTypeSerializerSnapshotTest {
 
 	@Test
 	public void testNestedFieldSerializerArityMismatchPrecedence() throws IOException {
-		final String OUTER_CONFIG = "outer-config";
-
 		final TypeSerializer<?>[] initialNestedSerializers = {
 			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
 		};
@@ -162,8 +151,7 @@ public class CompositeTypeSerializerSnapshotTest {
 			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
 				initialNestedSerializers,
 				newNestedSerializers,
-				OUTER_CONFIG,
-				OUTER_CONFIG);
+				OuterSchemaCompatibility.COMPATIBLE_AS_IS);
 
 		// arity mismatch in the nested serializers should return incompatible as the result
 		Assert.assertTrue(compatibility.isIncompatible());
@@ -172,10 +160,9 @@ public class CompositeTypeSerializerSnapshotTest {
 	private TypeSerializerSchemaCompatibility<String> snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
 			TypeSerializer<?>[] initialNestedSerializers,
 			TypeSerializer<?>[] newNestedSerializer,
-			String initialOuterConfiguration,
-			String newOuterConfiguration) throws IOException {
+			OuterSchemaCompatibility mockOuterSchemaCompatibilityResult) throws IOException {
 		TestCompositeTypeSerializer testSerializer =
-			new TestCompositeTypeSerializer(initialOuterConfiguration, initialNestedSerializers);
+			new TestCompositeTypeSerializer(initialNestedSerializers);
 
 		TypeSerializerSnapshot<String> testSerializerSnapshot = testSerializer.snapshotConfiguration();
 
@@ -187,7 +174,7 @@ public class CompositeTypeSerializerSnapshotTest {
 			in, Thread.currentThread().getContextClassLoader());
 
 		TestCompositeTypeSerializer newTestSerializer =
-			new TestCompositeTypeSerializer(newOuterConfiguration, newNestedSerializer);
+			new TestCompositeTypeSerializer(mockOuterSchemaCompatibilityResult, newNestedSerializer);
 		return testSerializerSnapshot.resolveSchemaCompatibility(newTestSerializer);
 	}
 
@@ -205,7 +192,7 @@ public class CompositeTypeSerializerSnapshotTest {
 			new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION)
 		};
 
-		TestCompositeTypeSerializer testSerializer = new TestCompositeTypeSerializer("outer-config", testNestedSerializers);
+		TestCompositeTypeSerializer testSerializer = new TestCompositeTypeSerializer(testNestedSerializers);
 
 		TypeSerializerSnapshot<String> testSerializerSnapshot = testSerializer.snapshotConfiguration();
 
@@ -238,19 +225,24 @@ public class CompositeTypeSerializerSnapshotTest {
 
 		private static final StringSerializer delegateSerializer = StringSerializer.INSTANCE;
 
-		private final String outerConfiguration;
+		private final OuterSchemaCompatibility mockOuterSchemaCompatibility;
 
 		private final TypeSerializer<?>[] nestedSerializers;
 
+		TestCompositeTypeSerializer(TypeSerializer<?>[] nestedSerializers) {
+			this.mockOuterSchemaCompatibility = OuterSchemaCompatibility.COMPATIBLE_AS_IS;
+			this.nestedSerializers = nestedSerializers;
+		}
+
 		TestCompositeTypeSerializer(
-				String outerConfiguration,
+				OuterSchemaCompatibility mockOuterSchemaCompatibility,
 				TypeSerializer<?>[] nestedSerializers) {
-			this.outerConfiguration = outerConfiguration;
+			this.mockOuterSchemaCompatibility = mockOuterSchemaCompatibility;
 			this.nestedSerializers = nestedSerializers;
 		}
 
-		public String getOuterConfiguration() {
-			return outerConfiguration;
+		public OuterSchemaCompatibility getMockOuterSchemaCompatibility() {
+			return mockOuterSchemaCompatibility;
 		}
 
 		TypeSerializer<?>[] getNestedSerializers() {
@@ -335,7 +327,7 @@ public class CompositeTypeSerializerSnapshotTest {
 	 */
 	public static class TestCompositeTypeSerializerSnapshot extends CompositeTypeSerializerSnapshot<String, TestCompositeTypeSerializer> {
 
-		private String outerConfiguration;
+		private OuterSchemaCompatibility mockOuterSchemaCompatibility;
 
 		public TestCompositeTypeSerializerSnapshot() {
 			super(TestCompositeTypeSerializer.class);
@@ -343,12 +335,12 @@ public class CompositeTypeSerializerSnapshotTest {
 
 		TestCompositeTypeSerializerSnapshot(TestCompositeTypeSerializer serializer) {
 			super(serializer);
-			this.outerConfiguration = serializer.getOuterConfiguration();
+			this.mockOuterSchemaCompatibility = serializer.getMockOuterSchemaCompatibility();
 		}
 
 		@Override
 		protected TestCompositeTypeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
-			return new TestCompositeTypeSerializer(outerConfiguration, nestedSerializers);
+			return new TestCompositeTypeSerializer(mockOuterSchemaCompatibility, nestedSerializers);
 		}
 
 		@Override
@@ -358,18 +350,18 @@ public class CompositeTypeSerializerSnapshotTest {
 
 		@Override
 		protected void writeOuterSnapshot(DataOutputView out) throws IOException {
-			out.writeUTF(outerConfiguration);
+			out.writeInt(mockOuterSchemaCompatibility.ordinal());
 		}
 
 		@Override
 		public void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
 			Assert.assertEquals(getCurrentOuterSnapshotVersion(), readOuterSnapshotVersion);
-			this.outerConfiguration = in.readUTF();
+			this.mockOuterSchemaCompatibility = OuterSchemaCompatibility.values()[in.readInt()];
 		}
 
 		@Override
-		protected boolean isOuterSnapshotCompatible(TestCompositeTypeSerializer newSerializer) {
-			return outerConfiguration.equals(newSerializer.getOuterConfiguration());
+		protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(TestCompositeTypeSerializer newSerializer) {
+			return newSerializer.getMockOuterSchemaCompatibility();
 		}
 
 		@Override


[flink] 01/05: [FLINK-17520] [core] Extend CompositeTypeSerializerSnapshot to allow migration based on outer snapshot

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 637d1de38b5911ac594f97af7d85d14434595e84
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon May 18 12:58:06 2020 +0800

    [FLINK-17520] [core] Extend CompositeTypeSerializerSnapshot to allow migration based on outer snapshot
    
    This commit deprecates isOuterSnapshotCompatible, which only allows
    signaling if the outer config is either compatible or not compatible, in
    favor of a new resolveOuterSchemaCompatibility method which additionally
    allows the user to signal migration.
    
    The change is backwards compatible, and allows subclasses that still
    only implement isOuterSnapshotCompatible to work as is.
---
 .../typeutils/CompositeTypeSerializerSnapshot.java | 78 +++++++++++++++++-----
 1 file changed, 63 insertions(+), 15 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
index fc4a40d..5e6c84e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -45,9 +45,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>Serializers that do have some outer snapshot needs to make sure to implement the methods
  * {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, and
- * {@link #isOuterSnapshotCompatible(TypeSerializer)} when using this class as the base for its serializer snapshot
- * class. By default, the base implementations of these methods are empty, i.e. this class assumes that
- * subclasses do not have any outer snapshot that needs to be persisted.
+ * {@link #resolveOuterSchemaCompatibility(TypeSerializer)} (TypeSerializer)} when using this class as the base
+ * for its serializer snapshot class. By default, the base implementations of these methods are empty, i.e. this
+ * class assumes that subclasses do not have any outer snapshot that needs to be persisted.
  *
  * <h2>Snapshot Versioning</h2>
  *
@@ -82,6 +82,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @PublicEvolving
 public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerializer<T>> implements TypeSerializerSnapshot<T> {
 
+	/**
+	 * Indicates schema compatibility of the serializer configuration persisted as the outer snapshot.
+	 */
+	protected enum OuterSchemaCompatibility {
+		COMPATIBLE_AS_IS,
+		COMPATIBLE_AFTER_MIGRATION,
+		INCOMPATIBLE
+	}
+
 	/** Magic number for integrity checks during deserialization. */
 	private static final int MAGIC_NUMBER = 911108;
 
@@ -168,10 +177,8 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 
 		S castedNewSerializer = correspondingSerializerClass.cast(newSerializer);
 
-		// check that outer configuration is compatible; if not, short circuit result
-		if (!isOuterSnapshotCompatible(castedNewSerializer)) {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
+		final OuterSchemaCompatibility outerSchemaCompatibility =
+			resolveOuterSchemaCompatibility(castedNewSerializer);
 
 		final TypeSerializer<?>[] newNestedSerializers = getNestedSerializers(castedNewSerializer);
 		// check that nested serializer arity remains identical; if not, short circuit result
@@ -179,7 +186,10 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 			return TypeSerializerSchemaCompatibility.incompatible();
 		}
 
-		return constructFinalSchemaCompatibilityResult(newNestedSerializers, snapshots);
+		return constructFinalSchemaCompatibilityResult(
+			newNestedSerializers,
+			snapshots,
+			outerSchemaCompatibility);
 	}
 
 	@Internal
@@ -237,7 +247,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 	 * only has nested serializers and no extra information. Otherwise, if the outer serializer contains
 	 * some extra information that needs to be persisted as part of the serializer snapshot, this
 	 * must be overridden. Note that this method and the corresponding methods
-	 * {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
+	 * {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, {@link #resolveOuterSchemaCompatibility(TypeSerializer)}
 	 * needs to be implemented.
 	 *
 	 * @param out the {@link DataOutputView} to write the outer snapshot to.
@@ -251,7 +261,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 	 * only has nested serializers and no extra information. Otherwise, if the outer serializer contains
 	 * some extra information that has been persisted as part of the serializer snapshot, this
 	 * must be overridden. Note that this method and the corresponding methods
-	 * {@link #writeOuterSnapshot(DataOutputView)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
+	 * {@link #writeOuterSnapshot(DataOutputView)}, {@link #resolveOuterSchemaCompatibility(TypeSerializer)}
 	 * needs to be implemented.
 	 *
 	 * @param readOuterSnapshotVersion the read version of the outer snapshot.
@@ -275,11 +285,38 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 	 *
 	 * @return a flag indicating whether or not the new serializer's outer information is compatible with the one
 	 *         written in this snapshot.
+	 *
+	 * @deprecated this method is deprecated, and will be removed in the future.
+	 *             Please implement {@link #resolveOuterSchemaCompatibility(TypeSerializer)} instead.
 	 */
+	@Deprecated
 	protected boolean isOuterSnapshotCompatible(S newSerializer) {
 		return true;
 	}
 
+	/**
+	 * Checks the schema compatibility of the given new serializer based on the outer snapshot.
+	 *
+	 * <p>The base implementation of this method assumes that the outer serializer
+	 * only has nested serializers and no extra information, and therefore the result of the check is
+	 * {@link OuterSchemaCompatibility#COMPATIBLE_AS_IS}. Otherwise, if the outer serializer contains
+	 * some extra information that has been persisted as part of the serializer snapshot, this
+	 * must be overridden. Note that this method and the corresponding methods
+	 * {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}
+	 * needs to be implemented.
+	 *
+	 * @param newSerializer the new serializer, which contains the new outer information to check against.
+	 *
+	 * @return a {@link OuterSchemaCompatibility} indicating whether or the new serializer's outer
+	 *         information is compatible, requires migration, or incompatible with the one written
+	 *         in this snapshot.
+	 */
+	protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(S newSerializer) {
+		return (isOuterSnapshotCompatible(newSerializer))
+			? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+			: OuterSchemaCompatibility.INCOMPATIBLE;
+	}
+
 	// ------------------------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------------------------
@@ -311,17 +348,28 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 
 	private TypeSerializerSchemaCompatibility<T> constructFinalSchemaCompatibilityResult(
 			TypeSerializer<?>[] newNestedSerializers,
-			TypeSerializerSnapshot<?>[] nestedSerializerSnapshots) {
+			TypeSerializerSnapshot<?>[] nestedSerializerSnapshots,
+			OuterSchemaCompatibility outerSchemaCompatibility) {
 
-		IntermediateCompatibilityResult<T> intermediateResult =
+		IntermediateCompatibilityResult<T> nestedSerializersCompatibilityResult =
 			CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(newNestedSerializers, nestedSerializerSnapshots);
 
-		if (intermediateResult.isCompatibleWithReconfiguredSerializer()) {
+		if (outerSchemaCompatibility == OuterSchemaCompatibility.INCOMPATIBLE
+				|| nestedSerializersCompatibilityResult.isIncompatible()) {
+			return TypeSerializerSchemaCompatibility.incompatible();
+		}
+
+		if (outerSchemaCompatibility == OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION
+				|| nestedSerializersCompatibilityResult.isCompatibleAfterMigration()) {
+			return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+		}
+
+		if (nestedSerializersCompatibilityResult.isCompatibleWithReconfiguredSerializer()) {
 			@SuppressWarnings("unchecked")
-			TypeSerializer<T> reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers(intermediateResult.getNestedSerializers());
+			TypeSerializer<T> reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers(nestedSerializersCompatibilityResult.getNestedSerializers());
 			return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredCompositeSerializer);
 		}
 
-		return intermediateResult.getFinalResult();
+		return TypeSerializerSchemaCompatibility.compatibleAsIs();
 	}
 }


[flink] 04/05: [FLINK-17520] [core] Rework all implementations of CompositeTypeSerializerSnapshot#resolveOuterSchemaCompatibility

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 01144bce59ce18cc229e90eb4f1c167725654693
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon May 18 13:46:12 2020 +0800

    [FLINK-17520] [core] Rework all implementations of CompositeTypeSerializerSnapshot#resolveOuterSchemaCompatibility
---
 .../api/common/typeutils/base/GenericArraySerializerSnapshot.java   | 6 ++++--
 .../apache/flink/api/java/typeutils/runtime/NullableSerializer.java | 6 ++++--
 .../flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java | 6 ++++--
 .../flink/api/scala/typeutils/TraversableSerializerSnapshot.java    | 6 ++++--
 .../api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java      | 6 ++++--
 5 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
index f517ec8..6e1c38f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -78,8 +78,10 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial
 	}
 
 	@Override
-	protected boolean isOuterSnapshotCompatible(GenericArraySerializer<C> newSerializer) {
-		return this.componentClass == newSerializer.getComponentClass();
+	protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(GenericArraySerializer<C> newSerializer) {
+		return (this.componentClass == newSerializer.getComponentClass())
+			? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+			: OuterSchemaCompatibility.INCOMPATIBLE;
 	}
 
 	@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
index 8d26fe1..c4cd0f0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
@@ -348,8 +348,10 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 		}
 
 		@Override
-		protected boolean isOuterSnapshotCompatible(NullableSerializer<T> newSerializer) {
-			return nullPaddingLength == newSerializer.nullPaddingLength();
+		protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(NullableSerializer<T> newSerializer) {
+			return (nullPaddingLength == newSerializer.nullPaddingLength())
+				? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+				: OuterSchemaCompatibility.INCOMPATIBLE;
 		}
 	}
 
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
index 886a59c..0145d2a 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
@@ -103,7 +103,9 @@ public final class ScalaCaseClassSerializerSnapshot<T extends scala.Product>
 	}
 
 	@Override
-	protected boolean isOuterSnapshotCompatible(ScalaCaseClassSerializer<T> newSerializer) {
-		return Objects.equals(type, newSerializer.getTupleClass());
+	protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(ScalaCaseClassSerializer<T> newSerializer) {
+		return (Objects.equals(type, newSerializer.getTupleClass()))
+			? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+			: OuterSchemaCompatibility.INCOMPATIBLE;
 	}
 }
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java
index 4cabceb..08d7ad6 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java
@@ -98,7 +98,9 @@ public class TraversableSerializerSnapshot<T extends TraversableOnce<E>, E>
 	}
 
 	@Override
-	protected boolean isOuterSnapshotCompatible(TraversableSerializer<T, E> newSerializer) {
-		return cbfCode.equals(newSerializer.cbfCode());
+	protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(TraversableSerializer<T, E> newSerializer) {
+		return (cbfCode.equals(newSerializer.cbfCode()))
+			? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+			: OuterSchemaCompatibility.INCOMPATIBLE;
 	}
 }
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java
index 9459181..9591663 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java
@@ -95,8 +95,10 @@ public final class Tuple2CaseClassSerializerSnapshot<T1, T2>
 	}
 
 	@Override
-	protected boolean isOuterSnapshotCompatible(ScalaCaseClassSerializer<Tuple2<T1, T2>> newSerializer) {
-		return Objects.equals(type, newSerializer.getTupleClass());
+	protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(ScalaCaseClassSerializer<Tuple2<T1, T2>> newSerializer) {
+		return (Objects.equals(type, newSerializer.getTupleClass()))
+			? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+			: OuterSchemaCompatibility.INCOMPATIBLE;
 	}
 
 	private static <T1, T2> Class<ScalaCaseClassSerializer<scala.Tuple2<T1, T2>>> correspondingSerializerClass() {


[flink] 05/05: [FLINK-17520] [doc] Use new resolveOuterSchemaCompatibility in custom serialization docs

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 91a4c2af08fb874ecd264c8af36709c6a340009d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon May 18 13:47:03 2020 +0800

    [FLINK-17520] [doc] Use new resolveOuterSchemaCompatibility in custom serialization docs
    
    This closes #12209.
---
 docs/dev/stream/state/custom_serialization.md    |  8 +++++---
 docs/dev/stream/state/custom_serialization.zh.md | 24 +++++++++++++-----------
 2 files changed, 18 insertions(+), 14 deletions(-)

diff --git a/docs/dev/stream/state/custom_serialization.md b/docs/dev/stream/state/custom_serialization.md
index 79f05a3..e741302 100644
--- a/docs/dev/stream/state/custom_serialization.md
+++ b/docs/dev/stream/state/custom_serialization.md
@@ -309,7 +309,7 @@ the nested element serializer.
 In these cases, an additional three methods need to be implemented on the `CompositeTypeSerializerSnapshot`:
  * `#writeOuterSnapshot(DataOutputView)`: defines how the outer snapshot information is written.
  * `#readOuterSnapshot(int, DataInputView, ClassLoader)`: defines how the outer snapshot information is read.
- * `#isOuterSnapshotCompatible(TypeSerializer)`: checks whether the outer snapshot information remains identical.
+ * `#resolveOuterSchemaCompatibility(TypeSerializer)`: checks the compatibility based on the outer snapshot information.
 
 By default, the `CompositeTypeSerializerSnapshot` assumes that there isn't any outer snapshot information to
 read / write, and therefore have empty default implementations for the above methods. If the subclass
@@ -351,8 +351,10 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial
     }
 
     @Override
-    protected boolean isOuterSnapshotCompatible(GenericArraySerializer newSerializer) {
-        return this.componentClass == newSerializer.getComponentClass();
+    protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer newSerializer) {
+        return (this.componentClass == newSerializer.getComponentClass())
+            ? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+            : OuterSchemaCompatibility.INCOMPATIBLE;
     }
 
     @Override
diff --git a/docs/dev/stream/state/custom_serialization.zh.md b/docs/dev/stream/state/custom_serialization.zh.md
index daeea17..2706e80 100644
--- a/docs/dev/stream/state/custom_serialization.zh.md
+++ b/docs/dev/stream/state/custom_serialization.zh.md
@@ -87,7 +87,7 @@ type and the *serialized binary format* of a state type. The schema, generally s
  1. Data schema of the state type has evolved, i.e. adding or removing a field from a POJO that is used as state.
  2. Generally speaking, after a change to the data schema, the serialization format of the serializer will need to be upgraded.
  3. Configuration of the serializer has changed.
-
+ 
 In order for the new execution to have information about the *written schema* of state and detect whether or not the
 schema has changed, upon taking a savepoint of an operator's state, a *snapshot* of the state serializer needs to be
 written along with the state bytes. This is abstracted a `TypeSerializerSnapshot`, explained in the next subsection.
@@ -108,10 +108,10 @@ public interface TypeSerializerSnapshot<T> {
 
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public abstract class TypeSerializer<T> {
-
+public abstract class TypeSerializer<T> {    
+    
     // ...
-
+    
     public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
 }
 {% endhighlight %}
@@ -140,7 +140,7 @@ which can be one of the following:
  2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**: this result signals that the new serializer has a
  different serialization schema, and it is possible to migrate from the old schema by using the previous serializer
  (which recognizes the old schema) to read bytes into state objects, and then rewriting the object back to bytes with
- the new serializer (which recognizes the new schema).
+ the new serializer (which recognizes the new schema). 
  3. **`TypeSerializerSchemaCompatibility.incompatible()`**: this result signals that the new serializer has a
  different serialization schema, but it is not possible to migrate from the old schema.
 
@@ -170,13 +170,13 @@ to the implementation of state serializers and their serializer snapshots.
   - Upon receiving the new serializer, it is provided to the restored previous serializer's snapshot via the
   `TypeSerializer#resolveSchemaCompatibility` to check for schema compatibility.
  4. **Migrate state bytes in backend from schema _A_ to schema _B_**
-  - If the compatibility resolution reflects that the schema has changed and migration is possible, schema migration is
+  - If the compatibility resolution reflects that the schema has changed and migration is possible, schema migration is 
   performed. The previous state serializer which recognizes schema _A_ will be obtained from the serializer snapshot, via
    `TypeSerializerSnapshot#restoreSerializer()`, and is used to deserialize state bytes to objects, which in turn
    are re-written again with the new serializer, which recognizes schema _B_ to complete the migration. All entries
    of the accessed state is migrated all-together before processing continues.
   - If the resolution signals incompatibility, then the state access fails with an exception.
-
+ 
 #### Heap state backends (e.g. `MemoryStateBackend`, `FsStateBackend`)
 
  1. **Register new state with a state serializer that has schema _A_**
@@ -218,7 +218,7 @@ as your serializer's snapshot class:
 
  - `TypeSerializerSchemaCompatibility.compatibleAsIs()`, if the new serializer class remains identical, or
  - `TypeSerializerSchemaCompatibility.incompatible()`, if the new serializer class is different then the previous one.
-
+ 
 Below is an example of how the `SimpleTypeSerializerSnapshot` is used, using Flink's `IntSerializer` as an example:
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -309,7 +309,7 @@ the nested element serializer.
 In these cases, an additional three methods need to be implemented on the `CompositeTypeSerializerSnapshot`:
  * `#writeOuterSnapshot(DataOutputView)`: defines how the outer snapshot information is written.
  * `#readOuterSnapshot(int, DataInputView, ClassLoader)`: defines how the outer snapshot information is read.
- * `#isOuterSnapshotCompatible(TypeSerializer)`: checks whether the outer snapshot information remains identical.
+ * `#resolveOuterSchemaCompatibility(TypeSerializer)`: checks the compatibility based on the outer snapshot information.
 
 By default, the `CompositeTypeSerializerSnapshot` assumes that there isn't any outer snapshot information to
 read / write, and therefore have empty default implementations for the above methods. If the subclass
@@ -351,8 +351,10 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial
     }
 
     @Override
-    protected boolean isOuterSnapshotCompatible(GenericArraySerializer newSerializer) {
-        return this.componentClass == newSerializer.getComponentClass();
+    protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer newSerializer) {
+        return (this.componentClass == newSerializer.getComponentClass())
+            ? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+            : OuterSchemaCompatibility.INCOMPATIBLE;
     }
 
     @Override


[flink] 03/05: [FLINK-17520] [test] Add test case for outer snapshot requiring migration

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 982a50328df6fe173a48bbaa0d5757df55b9238a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon May 18 13:07:27 2020 +0800

    [FLINK-17520] [test] Add test case for outer snapshot requiring migration
---
 .../CompositeTypeSerializerSnapshotTest.java          | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
index f0fcbd7..907da62 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
@@ -119,7 +119,7 @@ public class CompositeTypeSerializerSnapshotTest {
 	}
 
 	@Test
-	public void testOuterSnapshotCompatibilityPrecedence() throws IOException {
+	public void testOuterSnapshotIncompatiblePrecedence() throws IOException {
 		TypeSerializer<?>[] testNestedSerializers = {
 			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
 		};
@@ -136,6 +136,23 @@ public class CompositeTypeSerializerSnapshotTest {
 	}
 
 	@Test
+	public void testOuterSnapshotRequiresMigrationPrecedence() throws IOException {
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER),
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				testNestedSerializers,
+				OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION);
+
+		// even though nested serializers can be compatible with reconfiguration, the outer
+		// snapshot requiring migration should have higher precedence in the final result
+		Assert.assertTrue(compatibility.isCompatibleAfterMigration());
+	}
+
+	@Test
 	public void testNestedFieldSerializerArityMismatchPrecedence() throws IOException {
 		final TypeSerializer<?>[] initialNestedSerializers = {
 			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),