You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/20 19:47:54 UTC
[flink] 07/08: [hotfix][core] Fix TypeSerializerUpgradeTestBase for
serializer migration
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3f79ecc87d9f584bd03963c89c32e29a7b1ae379
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 20 12:11:09 2020 +0200
[hotfix][core] Fix TypeSerializerUpgradeTestBase for serializer migration
---
.../typeutils/TypeSerializerUpgradeTestBase.java | 29 ++++++++++++++--------
.../PojoSerializerUpgradeTestSpecifications.java | 21 ++++++++--------
.../runtime/RowSerializerUpgradeTest.java | 2 +-
3 files changed, 31 insertions(+), 21 deletions(-)
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
index 0a5ae1d..fe94ee9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
@@ -105,7 +105,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
* that the serializer upgrade produced with an expected {@link
* TypeSerializerSchemaCompatibility}.
*/
- Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher();
+ Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher(MigrationVersion version);
}
private static class ClassLoaderSafePreUpgradeSetup<PreviousElementT> implements PreUpgradeSetup<PreviousElementT> {
@@ -186,9 +186,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
}
@Override
- public Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher() {
+ public Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher(MigrationVersion version) {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) {
- return delegateVerifier.schemaCompatibilityMatcher();
+ return delegateVerifier.schemaCompatibilityMatcher(version);
} catch (IOException e) {
throw new RuntimeException(
"Error creating schema compatibility matcher via ClassLoaderSafeUpgradeVerifier.",
@@ -273,13 +273,14 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
assumeThat(
"This test only applies for test specifications that verify an upgraded serializer that is not incompatible.",
TypeSerializerSchemaCompatibility.incompatible(),
- not(testSpecification.verifier.schemaCompatibilityMatcher()));
+ not(testSpecification.verifier.schemaCompatibilityMatcher(testSpecification.migrationVersion)));
TypeSerializerSnapshot<UpgradedElementT> restoredSerializerSnapshot = snapshotUnderTest();
TypeSerializer<UpgradedElementT> restoredSerializer = restoredSerializerSnapshot.restoreSerializer();
assertSerializerIsValid(
restoredSerializer,
+ true,
dataUnderTest(),
testSpecification.verifier.testDataMatcher());
}
@@ -296,7 +297,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
assertThat(
upgradeCompatibility,
- testSpecification.verifier.schemaCompatibilityMatcher());
+ testSpecification.verifier.schemaCompatibilityMatcher(testSpecification.migrationVersion));
}
}
@@ -325,6 +326,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
// .. and then assert that the upgraded serializer is valid with the migrated data
assertSerializerIsValid(
upgradedSerializer,
+ false,
migratedData,
testSpecification.verifier.testDataMatcher());
}
@@ -346,6 +348,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
TypeSerializer<UpgradedElementT> reconfiguredUpgradedSerializer = upgradeCompatibility.getReconfiguredSerializer();
assertSerializerIsValid(
reconfiguredUpgradedSerializer,
+ false,
dataUnderTest(),
testSpecification.verifier.testDataMatcher());
}
@@ -366,6 +369,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
assertSerializerIsValid(
upgradedSerializer,
+ false,
dataUnderTest(),
testSpecification.verifier.testDataMatcher());
}
@@ -378,14 +382,17 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
* <p>A serializer is valid, iff:
* <ul>
* <li>1. The serializer can read and then write again the given serialized data.
- * <li>2. The serializer can produce a serializer snapshot which can be written and then read back again.
+ * <li>2. The serializer can produce a serializer snapshot which can be written and then read
+ * back again.
* <li>3. The serializer's produced snapshot is capable of creating a restore serializer.
* <li>4. The restore serializer created from the serializer snapshot can read and then
- * write again data written by step 1.
+ * write again data written by step 1. Given that the serializer is not a restore
+ * serializer already.
* </ul>
*/
private static <T> void assertSerializerIsValid(
TypeSerializer<T> serializer,
+ boolean isRestoreSerializer,
DataInputView dataInput,
Matcher<T> testDataMatcher) throws Exception {
@@ -394,9 +401,11 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
serializer,
serializer,
testDataMatcher);
- TypeSerializerSnapshot<T> snapshot = writeAndThenReadSerializerSnapshot(serializer);
- TypeSerializer<T> restoreSerializer = snapshot.restoreSerializer();
- readAndThenWriteData(serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
+ if (!isRestoreSerializer) {
+ TypeSerializerSnapshot<T> snapshot = writeAndThenReadSerializerSnapshot(serializer);
+ TypeSerializer<T> restoreSerializer = snapshot.restoreSerializer();
+ readAndThenWriteData(serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
+ }
}
// ------------------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
index 4626924..f142401 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.testutils.migration.MigrationVersion;
import org.hamcrest.Matcher;
@@ -228,7 +229,7 @@ public class PojoSerializerUpgradeTestSpecifications {
}
@Override
- public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+ public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
return TypeSerializerMatchers.isCompatibleAsIs();
}
}
@@ -330,7 +331,7 @@ public class PojoSerializerUpgradeTestSpecifications {
}
@Override
- public Matcher<TypeSerializerSchemaCompatibility<PojoAfterSchemaUpgrade>> schemaCompatibilityMatcher() {
+ public Matcher<TypeSerializerSchemaCompatibility<PojoAfterSchemaUpgrade>> schemaCompatibilityMatcher(MigrationVersion version) {
return TypeSerializerMatchers.isCompatibleAfterMigration();
}
}
@@ -395,7 +396,7 @@ public class PojoSerializerUpgradeTestSpecifications {
}
@Override
- public Matcher<TypeSerializerSchemaCompatibility<PojoWithStringField>> schemaCompatibilityMatcher() {
+ public Matcher<TypeSerializerSchemaCompatibility<PojoWithStringField>> schemaCompatibilityMatcher(MigrationVersion version) {
return TypeSerializerMatchers.isIncompatible();
}
}
@@ -519,7 +520,7 @@ public class PojoSerializerUpgradeTestSpecifications {
}
@Override
- public Matcher<TypeSerializerSchemaCompatibility<BasePojo>> schemaCompatibilityMatcher() {
+ public Matcher<TypeSerializerSchemaCompatibility<BasePojo>> schemaCompatibilityMatcher(MigrationVersion version) {
return TypeSerializerMatchers.isCompatibleAfterMigration();
}
}
@@ -592,7 +593,7 @@ public class PojoSerializerUpgradeTestSpecifications {
}
@Override
- public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+ public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
return TypeSerializerMatchers.isIncompatible();
}
}
@@ -631,7 +632,7 @@ public class PojoSerializerUpgradeTestSpecifications {
}
@Override
- public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+ public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
}
}
@@ -680,7 +681,7 @@ public class PojoSerializerUpgradeTestSpecifications {
}
@Override
- public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+ public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
}
}
@@ -727,7 +728,7 @@ public class PojoSerializerUpgradeTestSpecifications {
}
@Override
- public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+ public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
}
}
@@ -772,7 +773,7 @@ public class PojoSerializerUpgradeTestSpecifications {
}
@Override
- public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+ public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
}
}
@@ -819,7 +820,7 @@ public class PojoSerializerUpgradeTestSpecifications {
}
@Override
- public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+ public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
}
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
index 602f92e..cba9789 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
@@ -106,7 +106,7 @@ public class RowSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Row,
}
@Override
- public Matcher<TypeSerializerSchemaCompatibility<Row>> schemaCompatibilityMatcher() {
+ public Matcher<TypeSerializerSchemaCompatibility<Row>> schemaCompatibilityMatcher(MigrationVersion version) {
return TypeSerializerMatchers.isCompatibleAsIs();
}
}