You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/20 19:47:54 UTC

[flink] 07/08: [hotfix][core] Fix TypeSerializerUpgradeTestBase for serializer migration

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3f79ecc87d9f584bd03963c89c32e29a7b1ae379
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 20 12:11:09 2020 +0200

    [hotfix][core] Fix TypeSerializerUpgradeTestBase for serializer migration
---
 .../typeutils/TypeSerializerUpgradeTestBase.java   | 29 ++++++++++++++--------
 .../PojoSerializerUpgradeTestSpecifications.java   | 21 ++++++++--------
 .../runtime/RowSerializerUpgradeTest.java          |  2 +-
 3 files changed, 31 insertions(+), 21 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
index 0a5ae1d..fe94ee9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
@@ -105,7 +105,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 		 * that the serializer upgrade produced with an expected {@link
 		 * TypeSerializerSchemaCompatibility}.
 		 */
-		Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher();
+		Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher(MigrationVersion version);
 	}
 
 	private static class ClassLoaderSafePreUpgradeSetup<PreviousElementT> implements PreUpgradeSetup<PreviousElementT> {
@@ -186,9 +186,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher(MigrationVersion version) {
 			try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) {
-				return delegateVerifier.schemaCompatibilityMatcher();
+				return delegateVerifier.schemaCompatibilityMatcher(version);
 			} catch (IOException e) {
 				throw new RuntimeException(
 						"Error creating schema compatibility matcher via ClassLoaderSafeUpgradeVerifier.",
@@ -273,13 +273,14 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			assumeThat(
 					"This test only applies for test specifications that verify an upgraded serializer that is not incompatible.",
 					TypeSerializerSchemaCompatibility.incompatible(),
-					not(testSpecification.verifier.schemaCompatibilityMatcher()));
+					not(testSpecification.verifier.schemaCompatibilityMatcher(testSpecification.migrationVersion)));
 
 			TypeSerializerSnapshot<UpgradedElementT> restoredSerializerSnapshot = snapshotUnderTest();
 
 			TypeSerializer<UpgradedElementT> restoredSerializer = restoredSerializerSnapshot.restoreSerializer();
 			assertSerializerIsValid(
 					restoredSerializer,
+					true,
 					dataUnderTest(),
 					testSpecification.verifier.testDataMatcher());
 		}
@@ -296,7 +297,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 
 			assertThat(
 					upgradeCompatibility,
-					testSpecification.verifier.schemaCompatibilityMatcher());
+					testSpecification.verifier.schemaCompatibilityMatcher(testSpecification.migrationVersion));
 		}
 	}
 
@@ -325,6 +326,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			// .. and then assert that the upgraded serializer is valid with the migrated data
 			assertSerializerIsValid(
 					upgradedSerializer,
+					false,
 					migratedData,
 					testSpecification.verifier.testDataMatcher());
 		}
@@ -346,6 +348,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			TypeSerializer<UpgradedElementT> reconfiguredUpgradedSerializer = upgradeCompatibility.getReconfiguredSerializer();
 			assertSerializerIsValid(
 					reconfiguredUpgradedSerializer,
+					false,
 					dataUnderTest(),
 					testSpecification.verifier.testDataMatcher());
 		}
@@ -366,6 +369,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 
 			assertSerializerIsValid(
 					upgradedSerializer,
+					false,
 					dataUnderTest(),
 					testSpecification.verifier.testDataMatcher());
 		}
@@ -378,14 +382,17 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 	 * <p>A serializer is valid, iff:
 	 * <ul>
 	 *     <li>1. The serializer can read and then write again the given serialized data.
-	 *     <li>2. The serializer can produce a serializer snapshot which can be written and then read back again.
+	 *     <li>2. The serializer can produce a serializer snapshot which can be written and then read
+	 *            back again.
 	 *     <li>3. The serializer's produced snapshot is capable of creating a restore serializer.
 	 *     <li>4. The restore serializer created from the serializer snapshot can read and then
-	 *            write again data written by step 1.
+	 *            write again data written by step 1. Given that the serializer is not a restore
+	 *            serializer already.
 	 * </ul>
 	 */
 	private static <T> void assertSerializerIsValid(
 			TypeSerializer<T> serializer,
+			boolean isRestoreSerializer,
 			DataInputView dataInput,
 			Matcher<T> testDataMatcher) throws Exception {
 
@@ -394,9 +401,11 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 				serializer,
 				serializer,
 				testDataMatcher);
-		TypeSerializerSnapshot<T> snapshot = writeAndThenReadSerializerSnapshot(serializer);
-		TypeSerializer<T> restoreSerializer = snapshot.restoreSerializer();
-		readAndThenWriteData(serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
+		if (!isRestoreSerializer) {
+			TypeSerializerSnapshot<T> snapshot = writeAndThenReadSerializerSnapshot(serializer);
+			TypeSerializer<T> restoreSerializer = snapshot.restoreSerializer();
+			readAndThenWriteData(serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
+		}
 	}
 
 	// ------------------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
index 4626924..f142401 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import org.hamcrest.Matcher;
 
@@ -228,7 +229,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleAsIs();
 		}
 	}
@@ -330,7 +331,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<PojoAfterSchemaUpgrade>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<PojoAfterSchemaUpgrade>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleAfterMigration();
 		}
 	}
@@ -395,7 +396,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<PojoWithStringField>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<PojoWithStringField>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isIncompatible();
 		}
 	}
@@ -519,7 +520,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<BasePojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<BasePojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleAfterMigration();
 		}
 	}
@@ -592,7 +593,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isIncompatible();
 		}
 	}
@@ -631,7 +632,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
 		}
 	}
@@ -680,7 +681,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
 		}
 	}
@@ -727,7 +728,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
 		}
 	}
@@ -772,7 +773,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
 		}
 	}
@@ -819,7 +820,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
 		}
 	}
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
index 602f92e..cba9789 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
@@ -106,7 +106,7 @@ public class RowSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Row,
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<Row>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<Row>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleAsIs();
 		}
 	}