You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/20 19:47:51 UTC
[flink] 04/08: [hotfix][core] Fix warnings in
TypeSerializerUpgradeTestBase
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 87653daaa416f229078d5695b3d601d1f052ff38
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 19 16:04:29 2020 +0200
[hotfix][core] Fix warnings in TypeSerializerUpgradeTestBase
---
.../typeutils/TypeSerializerUpgradeTestBase.java | 194 ++++++++++++++-------
.../runtime/PojoSerializerUpgradeTest.java | 2 +-
.../runtime/RowSerializerUpgradeTest.java | 2 +-
pom.xml | 2 +
4 files changed, 134 insertions(+), 66 deletions(-)
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
index 04bc939..0a5ae1d 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
@@ -44,13 +44,13 @@ import static org.junit.Assume.assumeThat;
/**
* A test base for testing {@link TypeSerializer} upgrades.
- * <p>
- * You can run {@link #generateTestSetupFiles()} on a Flink branch to (re-)generate the test data
- * files.
+ *
+ * <p>You can run {@link #generateTestSetupFiles()} on a Flink branch to (re-)generate the test
+ * data files.
*/
public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedElementT> extends TestLogger {
- protected static final MigrationVersion[] migrationVersions = new MigrationVersion[]{
+ public static final MigrationVersion[] MIGRATION_VERSIONS = new MigrationVersion[]{
MigrationVersion.v1_7,
MigrationVersion.v1_8,
MigrationVersion.v1_9,
@@ -66,14 +66,45 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
// APIs
// ------------------------------------------------------------------------------
+ /**
+ * Setup code for a {@link TestSpecification}. This creates the serializer before upgrade and
+ * test data, that will be written by the created pre-upgrade {@link TypeSerializer}.
+ */
public interface PreUpgradeSetup<PreviousElementT> {
+
+ /**
+ * Creates a pre-upgrade {@link TypeSerializer}.
+ */
TypeSerializer<PreviousElementT> createPriorSerializer();
+
+ /**
+ * Creates test data that will be written using the pre-upgrade {@link TypeSerializer}.
+ */
PreviousElementT createTestData();
}
+ /**
+ * Verification code for a {@link TestSpecification}. This creates the "upgraded" {@link
+ * TypeSerializer} and provides matchers for comparing the deserialized test data and for the
+ * {@link TypeSerializerSchemaCompatibility}.
+ */
public interface UpgradeVerifier<UpgradedElementT> {
+
+ /**
+ * Creates a post-upgrade {@link TypeSerializer}.
+ */
TypeSerializer<UpgradedElementT> createUpgradedSerializer();
+
+ /**
+ * Returns a {@link Matcher} for asserting the deserialized test data.
+ */
Matcher<UpgradedElementT> testDataMatcher();
+
+ /**
+ * Returns a {@link Matcher} for comparing the {@link TypeSerializerSchemaCompatibility}
+ * that the serializer upgrade produced with an expected {@link
+ * TypeSerializerSchemaCompatibility}.
+ */
Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher();
}
@@ -85,7 +116,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
ClassLoaderSafePreUpgradeSetup(Class<? extends PreUpgradeSetup<PreviousElementT>> delegateSetupClass) throws Exception {
checkNotNull(delegateSetupClass);
Class<? extends PreUpgradeSetup<PreviousElementT>> relocatedDelegateSetupClass =
- ClassRelocator.relocate(delegateSetupClass);
+ ClassRelocator.relocate(delegateSetupClass);
this.setupClassloader = relocatedDelegateSetupClass.getClassLoader();
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(setupClassloader)) {
@@ -98,7 +129,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(setupClassloader)) {
return delegateSetup.createPriorSerializer();
} catch (IOException e) {
- throw new RuntimeException("Error creating prior serializer via ClassLoaderSafePreUpgradeSetup.", e);
+ throw new RuntimeException(
+ "Error creating prior serializer via ClassLoaderSafePreUpgradeSetup.",
+ e);
}
}
@@ -107,7 +140,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(setupClassloader)) {
return delegateSetup.createTestData();
} catch (IOException e) {
- throw new RuntimeException("Error creating test data via ThreadContextClassLoader.", e);
+ throw new RuntimeException(
+ "Error creating test data via ThreadContextClassLoader.",
+ e);
}
}
}
@@ -120,7 +155,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
ClassLoaderSafeUpgradeVerifier(Class<? extends UpgradeVerifier<UpgradedElementT>> delegateVerifierClass) throws Exception {
checkNotNull(delegateVerifierClass);
Class<? extends UpgradeVerifier<UpgradedElementT>> relocatedDelegateVerifierClass =
- ClassRelocator.relocate(delegateVerifierClass);
+ ClassRelocator.relocate(delegateVerifierClass);
this.verifierClassloader = relocatedDelegateVerifierClass.getClassLoader();
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) {
@@ -133,7 +168,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) {
return delegateVerifier.createUpgradedSerializer();
} catch (IOException e) {
- throw new RuntimeException("Error creating upgraded serializer via ClassLoaderSafeUpgradeVerifier.", e);
+ throw new RuntimeException(
+ "Error creating upgraded serializer via ClassLoaderSafeUpgradeVerifier.",
+ e);
}
}
@@ -142,7 +179,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) {
return delegateVerifier.testDataMatcher();
} catch (IOException e) {
- throw new RuntimeException("Error creating expected test data via ClassLoaderSafeUpgradeVerifier.", e);
+ throw new RuntimeException(
+ "Error creating expected test data via ClassLoaderSafeUpgradeVerifier.",
+ e);
}
}
@@ -151,11 +190,17 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) {
return delegateVerifier.schemaCompatibilityMatcher();
} catch (IOException e) {
- throw new RuntimeException("Error creating schema compatibility matcher via ClassLoaderSafeUpgradeVerifier.", e);
+ throw new RuntimeException(
+ "Error creating schema compatibility matcher via ClassLoaderSafeUpgradeVerifier.",
+ e);
}
}
}
+ /**
+ * Specification of one test scenario. This mainly needs a {@link PreUpgradeSetup} and {@link
+ * UpgradeVerifier}.
+ */
public static class TestSpecification<PreviousElementT, UpgradedElementT> {
private final String name;
private final MigrationVersion migrationVersion;
@@ -186,10 +231,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
private static final int INITIAL_OUTPUT_BUFFER_SIZE = 64;
/**
- * Execute this test to generate test files.
- * Remember to be using the correct branch when generating the test files,
- * e.g. to generate test files for {@link MigrationVersion#v1_8}, you should be under the
- * release-1.8 branch.
+ * Execute this test to generate test files. Remember to be using the correct branch when
+ * generating the test files, e.g. to generate test files for {@link MigrationVersion#v1_8}, you
+ * should be under the release-1.8 branch.
*/
@Ignore
@Test
@@ -207,9 +251,15 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
writeContentsTo(getTestDataFilePath(), testDataOut.getCopyOfBuffer());
// ... then write the serializer snapshot
- DataOutputSerializer serializerSnapshotOut = new DataOutputSerializer(INITIAL_OUTPUT_BUFFER_SIZE);
- writeSerializerSnapshot(serializerSnapshotOut, priorSerializer, testSpecification.migrationVersion);
- writeContentsTo(getSerializerSnapshotFilePath(), serializerSnapshotOut.getCopyOfBuffer());
+ DataOutputSerializer serializerSnapshotOut = new DataOutputSerializer(
+ INITIAL_OUTPUT_BUFFER_SIZE);
+ writeSerializerSnapshot(
+ serializerSnapshotOut,
+ priorSerializer,
+ testSpecification.migrationVersion);
+ writeContentsTo(
+ getSerializerSnapshotFilePath(),
+ serializerSnapshotOut.getCopyOfBuffer());
}
}
@@ -221,17 +271,17 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
public void restoreSerializerIsValid() throws Exception {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(testSpecification.verifier.verifierClassloader)) {
assumeThat(
- "This test only applies for test specifications that verify an upgraded serializer that is not incompatible.",
- TypeSerializerSchemaCompatibility.incompatible(),
- not(testSpecification.verifier.schemaCompatibilityMatcher()));
+ "This test only applies for test specifications that verify an upgraded serializer that is not incompatible.",
+ TypeSerializerSchemaCompatibility.incompatible(),
+ not(testSpecification.verifier.schemaCompatibilityMatcher()));
TypeSerializerSnapshot<UpgradedElementT> restoredSerializerSnapshot = snapshotUnderTest();
TypeSerializer<UpgradedElementT> restoredSerializer = restoredSerializerSnapshot.restoreSerializer();
assertSerializerIsValid(
- restoredSerializer,
- dataUnderTest(),
- testSpecification.verifier.testDataMatcher());
+ restoredSerializer,
+ dataUnderTest(),
+ testSpecification.verifier.testDataMatcher());
}
}
@@ -242,9 +292,11 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
TypeSerializer<UpgradedElementT> upgradedSerializer = testSpecification.verifier.createUpgradedSerializer();
TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility =
- restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
+ restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
- assertThat(upgradeCompatibility, testSpecification.verifier.schemaCompatibilityMatcher());
+ assertThat(
+ upgradeCompatibility,
+ testSpecification.verifier.schemaCompatibilityMatcher());
}
}
@@ -256,22 +308,25 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
TypeSerializer<UpgradedElementT> upgradedSerializer = testSpecification.verifier.createUpgradedSerializer();
TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility =
- restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
+ restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
assumeThat(
- "This test only applies for test specifications that verify an upgraded serializer that requires migration to be compatible.",
- upgradeCompatibility,
- TypeSerializerMatchers.isCompatibleAfterMigration());
+ "This test only applies for test specifications that verify an upgraded serializer that requires migration to be compatible.",
+ upgradeCompatibility,
+ TypeSerializerMatchers.isCompatibleAfterMigration());
// migrate the previous data schema,
TypeSerializer<UpgradedElementT> restoreSerializer = restoredSerializerSnapshot.restoreSerializer();
DataInputView migratedData = readAndThenWriteData(
- dataUnderTest(),
- restoreSerializer,
- upgradedSerializer,
- testSpecification.verifier.testDataMatcher());
+ dataUnderTest(),
+ restoreSerializer,
+ upgradedSerializer,
+ testSpecification.verifier.testDataMatcher());
// .. and then assert that the upgraded serializer is valid with the migrated data
- assertSerializerIsValid(upgradedSerializer, migratedData, testSpecification.verifier.testDataMatcher());
+ assertSerializerIsValid(
+ upgradedSerializer,
+ migratedData,
+ testSpecification.verifier.testDataMatcher());
}
}
@@ -282,17 +337,17 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
TypeSerializer<UpgradedElementT> upgradedSerializer = testSpecification.verifier.createUpgradedSerializer();
TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility =
- restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
+ restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
assumeThat(
- "This test only applies for test specifications that verify an upgraded serializer that requires reconfiguration to be compatible.",
- upgradeCompatibility,
- TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer());
+ "This test only applies for test specifications that verify an upgraded serializer that requires reconfiguration to be compatible.",
+ upgradeCompatibility,
+ TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer());
TypeSerializer<UpgradedElementT> reconfiguredUpgradedSerializer = upgradeCompatibility.getReconfiguredSerializer();
assertSerializerIsValid(
- reconfiguredUpgradedSerializer,
- dataUnderTest(),
- testSpecification.verifier.testDataMatcher());
+ reconfiguredUpgradedSerializer,
+ dataUnderTest(),
+ testSpecification.verifier.testDataMatcher());
}
}
@@ -303,21 +358,22 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
TypeSerializer<UpgradedElementT> upgradedSerializer = testSpecification.verifier.createUpgradedSerializer();
TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility =
- restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
+ restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
assumeThat(
- "This test only applies for test specifications that verify an upgraded serializer that is compatible as is.",
- upgradeCompatibility,
- TypeSerializerMatchers.isCompatibleAsIs());
+ "This test only applies for test specifications that verify an upgraded serializer that is compatible as is.",
+ upgradeCompatibility,
+ TypeSerializerMatchers.isCompatibleAsIs());
assertSerializerIsValid(
- upgradedSerializer,
- dataUnderTest(),
- testSpecification.verifier.testDataMatcher());
+ upgradedSerializer,
+ dataUnderTest(),
+ testSpecification.verifier.testDataMatcher());
}
}
/**
- * Asserts that a given {@link TypeSerializer} is valid, given a {@link DataInputView} of serialized data.
+ * Asserts that a given {@link TypeSerializer} is valid, given a {@link DataInputView} of
+ * serialized data.
*
* <p>A serializer is valid, iff:
* <ul>
@@ -333,7 +389,11 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
DataInputView dataInput,
Matcher<T> testDataMatcher) throws Exception {
- DataInputView serializedData = readAndThenWriteData(dataInput, serializer, serializer, testDataMatcher);
+ DataInputView serializedData = readAndThenWriteData(
+ dataInput,
+ serializer,
+ serializer,
+ testDataMatcher);
TypeSerializerSnapshot<T> snapshot = writeAndThenReadSerializerSnapshot(serializer);
TypeSerializer<T> restoreSerializer = snapshot.restoreSerializer();
readAndThenWriteData(serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
@@ -357,19 +417,18 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
private TypeSerializerSnapshot<UpgradedElementT> snapshotUnderTest() throws Exception {
return readSerializerSnapshot(
- contentsOf(getSerializerSnapshotFilePath()),
- testSpecification.migrationVersion);
+ contentsOf(getSerializerSnapshotFilePath()),
+ testSpecification.migrationVersion);
}
- private DataInputView dataUnderTest() throws IOException {
+ private DataInputView dataUnderTest() {
return contentsOf(getTestDataFilePath());
}
private static void writeContentsTo(Path path, byte[] bytes) {
try {
Files.write(path, bytes);
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("Failed to write to " + path, e);
}
}
@@ -378,8 +437,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
try {
byte[] bytes = Files.readAllBytes(path);
return new DataInputDeserializer(bytes);
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("Failed to read contents of " + path, e);
}
}
@@ -401,15 +459,19 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
TypeSerializer<T> serializer) throws IOException {
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
- out, serializer.snapshotConfiguration(), serializer);
+ out, serializer.snapshotConfiguration(), serializer);
}
+ @SuppressWarnings("deprecation")
private static <T> void writeSerializerSnapshotPre17Format(
DataOutputView out,
TypeSerializer<T> serializer) throws IOException {
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
- out, Collections.singletonList(Tuple2.of(serializer, serializer.snapshotConfiguration())));
+ out,
+ Collections.singletonList(Tuple2.of(
+ serializer,
+ serializer.snapshotConfiguration())));
}
private static <T> TypeSerializerSnapshot<T> readSerializerSnapshot(
@@ -429,16 +491,18 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
ClassLoader userCodeClassLoader) throws IOException {
return TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
- in, userCodeClassLoader, null);
+ in, userCodeClassLoader, null);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
private static <T> TypeSerializerSnapshot<T> readSerializerSnapshotPre17Format(
DataInputView in,
ClassLoader userCodeClassLoader) throws IOException {
List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializerSnapshotPair =
- TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader);
+ TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
+ in,
+ userCodeClassLoader);
return (TypeSerializerSnapshot<T>) serializerSnapshotPair.get(0).f1;
}
@@ -463,6 +527,8 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
writeSerializerSnapshotCurrentFormat(out, serializer);
DataInputDeserializer in = new DataInputDeserializer(out.wrapAsByteBuffer());
- return readSerializerSnapshotCurrentFormat(in, Thread.currentThread().getContextClassLoader());
+ return readSerializerSnapshotCurrentFormat(
+ in,
+ Thread.currentThread().getContextClassLoader());
}
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTest.java
index f642afe..fac1bde 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTest.java
@@ -40,7 +40,7 @@ public class PojoSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Obj
@Parameterized.Parameters(name = "Test Specification = {0}")
public static Collection<TestSpecification<?, ?>> testSpecifications() throws Exception {
ArrayList<TestSpecification<?, ?>> testSpecifications = new ArrayList<>();
- for (MigrationVersion migrationVersion : migrationVersions) {
+ for (MigrationVersion migrationVersion : MIGRATION_VERSIONS) {
testSpecifications.add(
new TestSpecification<>(
"pojo-serializer-identical-schema",
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
index 0c9a47b..602f92e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
@@ -50,7 +50,7 @@ public class RowSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Row,
@Parameterized.Parameters(name = "Test Specification = {0}")
public static Collection<TestSpecification<?, ?>> testSpecifications() throws Exception {
ArrayList<TestSpecification<?, ?>> testSpecifications = new ArrayList<>();
- for (MigrationVersion migrationVersion : migrationVersions) {
+ for (MigrationVersion migrationVersion : MIGRATION_VERSIONS) {
testSpecifications.add(
new TestSpecification<>(
"row-serializer",
diff --git a/pom.xml b/pom.xml
index e72adf7..136738e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1456,6 +1456,8 @@ under the License.
<exclude>flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/query/*</exclude>
<!-- snapshots -->
+ <exclude>**/src/test/resources/**/serializer-snapshot</exclude>
+ <exclude>**/src/test/resources/**/test-data</exclude>
<exclude>**/src/test/resources/*-snapshot</exclude>
<exclude>**/src/test/resources/*.snapshot</exclude>
<exclude>**/src/test/resources/*-savepoint</exclude>