You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/20 19:47:47 UTC

[flink] branch release-1.11 updated (e321e48 -> 75a2b5f)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from e321e48  [FLINK-17730][CI] Increase 'no output timeout' to 15 minutes
     new d9d7c79  [hotfix] Fix directory creation in TypeSerializerUpgradeTestBase
     new 1d3425b  [FLINK-13632] In TypeSerializerUpgradeTestBase.UpgradeVerifier return matcher instead of data
     new 44cc684  [FLINK-13632] Port RowSerializer upgrade test to TypeSerializerUpgradeTestBase
     new 87653da  [hotfix][core] Fix warnings in TypeSerializerUpgradeTestBase
     new 4dd729b  [hotfix][core] Fix migration version comparision for 1.10
     new a2d6854  [hotfix][core] Add open interval in MigrationVersion
     new 3f79ecc  [hotfix][core] Fix TypeSerializerUpgradeTestBase for serializer migration
     new 75a2b5f  [FLINK-16998][core] Support backwards compatibility for upgraded RowSerializer

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/java/typeutils/runtime/RowSerializer.java  |  20 +-
 .../typeutils/TypeSerializerUpgradeTestBase.java   | 236 ++++++++++++++-------
 .../runtime/PojoSerializerUpgradeTest.java         |   2 +-
 .../PojoSerializerUpgradeTestSpecifications.java   |  61 +++---
 .../runtime/RowSerializerMigrationTest.java        |  66 ------
 .../runtime/RowSerializerUpgradeTest.java          | 127 +++++++++++
 .../testutils/migration/MigrationVersion.java      |  18 +-
 .../test/resources/flink-1.6-row-serializer-data   | Bin 240 -> 0 bytes
 .../resources/flink-1.6-row-serializer-snapshot    | Bin 1444 -> 0 bytes
 .../test/resources/flink-1.7-row-serializer-data   | Bin 240 -> 0 bytes
 .../resources/flink-1.7-row-serializer-snapshot    | Bin 1454 -> 0 bytes
 .../row-serializer-1.10/serializer-snapshot        | Bin 0 -> 465 bytes
 .../test/resources/row-serializer-1.10/test-data   | Bin 0 -> 20 bytes
 pom.xml                                            |   2 +
 14 files changed, 346 insertions(+), 186 deletions(-)
 delete mode 100644 flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java
 create mode 100644 flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
 delete mode 100644 flink-core/src/test/resources/flink-1.6-row-serializer-data
 delete mode 100644 flink-core/src/test/resources/flink-1.6-row-serializer-snapshot
 delete mode 100644 flink-core/src/test/resources/flink-1.7-row-serializer-data
 delete mode 100644 flink-core/src/test/resources/flink-1.7-row-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/row-serializer-1.10/serializer-snapshot
 create mode 100644 flink-core/src/test/resources/row-serializer-1.10/test-data


[flink] 02/08: [FLINK-13632] In TypeSerializerUpgradeTestBase.UpgradeVerifier return matcher instead of data

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d3425b0f084d0033116147e477c544e5f174fe5
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Mon Jan 27 17:48:19 2020 +0100

    [FLINK-13632] In TypeSerializerUpgradeTestBase.UpgradeVerifier return matcher instead of data
    
    Before, the test was using equals() to compare the expected data to the actual data. This does not work for types that don't have a proper equals() implementation. Now we return matchers and tests that use such types can return an appropriate matcher. Most tests can use the is() matcher, if the test data class has a proper equals() method.
---
 .../typeutils/TypeSerializerUpgradeTestBase.java   | 27 +++++++--------
 .../PojoSerializerUpgradeTestSpecifications.java   | 40 ++++++++++++----------
 2 files changed, 34 insertions(+), 33 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
index 92341cb..04bc939 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
@@ -39,7 +39,6 @@ import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.CoreMatchers.not;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assume.assumeThat;
 
@@ -74,7 +73,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 
 	public interface UpgradeVerifier<UpgradedElementT> {
 		TypeSerializer<UpgradedElementT> createUpgradedSerializer();
-		UpgradedElementT expectedTestData();
+		Matcher<UpgradedElementT> testDataMatcher();
 		Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher();
 	}
 
@@ -139,9 +138,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 		}
 
 		@Override
-		public UpgradedElementT expectedTestData() {
+		public Matcher<UpgradedElementT> testDataMatcher() {
 			try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) {
-				return delegateVerifier.expectedTestData();
+				return delegateVerifier.testDataMatcher();
 			} catch (IOException e) {
 				throw new RuntimeException("Error creating expected test data via ClassLoaderSafeUpgradeVerifier.", e);
 			}
@@ -232,7 +231,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			assertSerializerIsValid(
 				restoredSerializer,
 				dataUnderTest(),
-				testSpecification.verifier.expectedTestData());
+				testSpecification.verifier.testDataMatcher());
 		}
 	}
 
@@ -269,10 +268,10 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 				dataUnderTest(),
 				restoreSerializer,
 				upgradedSerializer,
-				testSpecification.verifier.expectedTestData());
+				testSpecification.verifier.testDataMatcher());
 
 			// .. and then assert that the upgraded serializer is valid with the migrated data
-			assertSerializerIsValid(upgradedSerializer, migratedData, testSpecification.verifier.expectedTestData());
+			assertSerializerIsValid(upgradedSerializer, migratedData, testSpecification.verifier.testDataMatcher());
 		}
 	}
 
@@ -293,7 +292,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			assertSerializerIsValid(
 				reconfiguredUpgradedSerializer,
 				dataUnderTest(),
-				testSpecification.verifier.expectedTestData());
+				testSpecification.verifier.testDataMatcher());
 		}
 	}
 
@@ -313,7 +312,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			assertSerializerIsValid(
 				upgradedSerializer,
 				dataUnderTest(),
-				testSpecification.verifier.expectedTestData());
+				testSpecification.verifier.testDataMatcher());
 		}
 	}
 
@@ -332,12 +331,12 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 	private static <T> void assertSerializerIsValid(
 			TypeSerializer<T> serializer,
 			DataInputView dataInput,
-			T expectedData) throws Exception {
+			Matcher<T> testDataMatcher) throws Exception {
 
-		DataInputView serializedData = readAndThenWriteData(dataInput, serializer, serializer, expectedData);
+		DataInputView serializedData = readAndThenWriteData(dataInput, serializer, serializer, testDataMatcher);
 		TypeSerializerSnapshot<T> snapshot = writeAndThenReadSerializerSnapshot(serializer);
 		TypeSerializer<T> restoreSerializer = snapshot.restoreSerializer();
-		readAndThenWriteData(serializedData, restoreSerializer, restoreSerializer, expectedData);
+		readAndThenWriteData(serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
 	}
 
 	// ------------------------------------------------------------------------------
@@ -447,10 +446,10 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			DataInputView originalDataInput,
 			TypeSerializer<T> readSerializer,
 			TypeSerializer<T> writeSerializer,
-			T sanityCheckData) throws IOException {
+			Matcher<T> testDataMatcher) throws IOException {
 
 		T data = readSerializer.deserialize(originalDataInput);
-		assertEquals(sanityCheckData, data);
+		assertThat(data, testDataMatcher);
 
 		DataOutputSerializer out = new DataOutputSerializer(INITIAL_OUTPUT_BUFFER_SIZE);
 		writeSerializer.serialize(data, out);
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
index c860386..4626924 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
@@ -30,6 +30,7 @@ import org.hamcrest.Matcher;
 
 import java.util.Objects;
 
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertSame;
 
 /**
@@ -222,8 +223,8 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public StaticSchemaPojo expectedTestData() {
-			return new StaticSchemaPojo("Gordon", 27, StaticSchemaPojo.Color.BLUE, false);
+		public Matcher<StaticSchemaPojo> testDataMatcher() {
+			return is(new StaticSchemaPojo("Gordon", 27, StaticSchemaPojo.Color.BLUE, false));
 		}
 
 		@Override
@@ -324,8 +325,8 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public PojoAfterSchemaUpgrade expectedTestData() {
-			return new PojoAfterSchemaUpgrade("Gordon", 27, null, false);
+		public Matcher<PojoAfterSchemaUpgrade> testDataMatcher() {
+			return is(new PojoAfterSchemaUpgrade("Gordon", 27, null, false));
 		}
 
 		@Override
@@ -389,7 +390,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public PojoWithStringField expectedTestData() {
+		public Matcher<PojoWithStringField> testDataMatcher() {
 			throw new UnsupportedOperationException();
 		}
 
@@ -513,8 +514,8 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public BasePojo expectedTestData() {
-			return new SubclassPojoAfterSchemaUpgrade(911108, "Gordon", true, 0, 0.0);
+		public Matcher<BasePojo> testDataMatcher() {
+			return is(new SubclassPojoAfterSchemaUpgrade(911108, "Gordon", true, 0, 0.0));
 		}
 
 		@Override
@@ -586,8 +587,8 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public StaticSchemaPojo expectedTestData() {
-			return new SubclassPojoWithStringField("gt", 7, StaticSchemaPojo.Color.BLUE, false, "911108");
+		public Matcher<StaticSchemaPojo> testDataMatcher() {
+			return is(new SubclassPojoWithStringField("gt", 7, StaticSchemaPojo.Color.BLUE, false, "911108"));
 		}
 
 		@Override
@@ -625,8 +626,8 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public StaticSchemaPojo expectedTestData() {
-			return new StaticSchemaPojoSubclassA("gt", 7, StaticSchemaPojo.Color.BLUE, false, 911108);
+		public Matcher<StaticSchemaPojo> testDataMatcher() {
+			return is(new StaticSchemaPojoSubclassA("gt", 7, StaticSchemaPojo.Color.BLUE, false, 911108));
 		}
 
 		@Override
@@ -673,8 +674,9 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public StaticSchemaPojo expectedTestData() {
-			return new StaticSchemaPojoSubclassB("gt", 7, StaticSchemaPojo.Color.BLUE, false, true);
+		public Matcher<StaticSchemaPojo> testDataMatcher() {
+
+			return is(new StaticSchemaPojoSubclassB("gt", 7, StaticSchemaPojo.Color.BLUE, false, true));
 		}
 
 		@Override
@@ -720,8 +722,8 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public StaticSchemaPojo expectedTestData() {
-			return new StaticSchemaPojoSubclassB("gt", 7, StaticSchemaPojo.Color.BLUE, false, true);
+		public Matcher<StaticSchemaPojo> testDataMatcher() {
+			return is(new StaticSchemaPojoSubclassB("gt", 7, StaticSchemaPojo.Color.BLUE, false, true));
 		}
 
 		@Override
@@ -765,8 +767,8 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public StaticSchemaPojo expectedTestData() {
-			return new StaticSchemaPojoSubclassA("gt", 7, StaticSchemaPojo.Color.BLUE, false, 911108);
+		public Matcher<StaticSchemaPojo> testDataMatcher() {
+			return is(new StaticSchemaPojoSubclassA("gt", 7, StaticSchemaPojo.Color.BLUE, false, 911108));
 		}
 
 		@Override
@@ -812,8 +814,8 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public StaticSchemaPojo expectedTestData() {
-			return new StaticSchemaPojoSubclassB("gt", 7, StaticSchemaPojo.Color.BLUE, false, true);
+		public Matcher<StaticSchemaPojo> testDataMatcher() {
+			return is(new StaticSchemaPojoSubclassB("gt", 7, StaticSchemaPojo.Color.BLUE, false, true));
 		}
 
 		@Override


[flink] 08/08: [FLINK-16998][core] Support backwards compatibility for upgraded RowSerializer

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 75a2b5fd2b8f3df36de219a055822c846143a6c3
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 19 16:02:33 2020 +0200

    [FLINK-16998][core] Support backwards compatibility for upgraded RowSerializer
---
 .../api/java/typeutils/runtime/RowSerializer.java  |  20 +++++++-----
 .../runtime/RowSerializerUpgradeTest.java          |  34 +++++++++++++++------
 .../row-serializer-1.10/serializer-snapshot        | Bin 0 -> 465 bytes
 .../test/resources/row-serializer-1.10/test-data   | Bin 0 -> 20 bytes
 4 files changed, 37 insertions(+), 17 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index a492457..4684368 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -60,7 +60,8 @@ public final class RowSerializer extends TypeSerializer<Row> {
 
 	public static final int ROW_KIND_OFFSET = 2;
 
-	private static final long serialVersionUID = 2L;
+	// legacy, don't touch until we drop support for 1.9 savepoints
+	private static final long serialVersionUID = 1L;
 
 	private final boolean legacyModeEnabled;
 
@@ -367,14 +368,13 @@ public final class RowSerializer extends TypeSerializer<Row> {
 	/**
 	 * A {@link TypeSerializerSnapshot} for RowSerializer.
 	 */
-	// TODO not fully functional yet due to FLINK-17520
 	public static final class RowSerializerSnapshot extends CompositeTypeSerializerSnapshot<Row, RowSerializer> {
 
 		private static final int VERSION = 3;
 
-		private static final int VERSION_WITHOUT_ROW_KIND = 2;
+		private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
 
-		private boolean legacyModeEnabled = false;
+		private int readVersion = VERSION;
 
 		public RowSerializerSnapshot() {
 			super(RowSerializer.class);
@@ -394,9 +394,15 @@ public final class RowSerializer extends TypeSerializer<Row> {
 				int readOuterSnapshotVersion,
 				DataInputView in,
 				ClassLoader userCodeClassLoader) {
-			if (readOuterSnapshotVersion == VERSION_WITHOUT_ROW_KIND) {
-				legacyModeEnabled = true;
+			readVersion = readOuterSnapshotVersion;
+		}
+
+		@Override
+		protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(RowSerializer newSerializer) {
+			if (readVersion <= LAST_VERSION_WITHOUT_ROW_KIND) {
+				return OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION;
 			}
+			return OuterSchemaCompatibility.COMPATIBLE_AS_IS;
 		}
 
 		@Override
@@ -406,7 +412,7 @@ public final class RowSerializer extends TypeSerializer<Row> {
 
 		@Override
 		protected RowSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
-			return new RowSerializer(nestedSerializers, legacyModeEnabled);
+			return new RowSerializer(nestedSerializers, readVersion <= LAST_VERSION_WITHOUT_ROW_KIND);
 		}
 	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
index cba9789..f30893c 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
 import org.hamcrest.Matcher;
 import org.junit.runner.RunWith;
@@ -50,7 +51,7 @@ public class RowSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Row,
 	@Parameterized.Parameters(name = "Test Specification = {0}")
 	public static Collection<TestSpecification<?, ?>> testSpecifications() throws Exception {
 		ArrayList<TestSpecification<?, ?>> testSpecifications = new ArrayList<>();
-		for (MigrationVersion migrationVersion : MIGRATION_VERSIONS) {
+		for (MigrationVersion migrationVersion : MigrationVersion.v1_10.orHigher()) {
 			testSpecifications.add(
 				new TestSpecification<>(
 					"row-serializer",
@@ -61,8 +62,14 @@ public class RowSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Row,
 		return testSpecifications;
 	}
 
-	public static TypeSerializer<Row> stringLongRowSupplier() {
-		RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
+	public static TypeSerializer<Row> createRowSerializer() {
+		// in older branches, this writes in old format WITHOUT row kind;
+		// in newer branches >= 1.11, this writes in new format WITH row kind
+		final RowTypeInfo rowTypeInfo = new RowTypeInfo(
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
 		return rowTypeInfo.createSerializer(new ExecutionConfig());
 	}
 
@@ -76,14 +83,16 @@ public class RowSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Row,
 	public static final class RowSerializerSetup implements TypeSerializerUpgradeTestBase.PreUpgradeSetup<Row> {
 		@Override
 		public TypeSerializer<Row> createPriorSerializer() {
-			return stringLongRowSupplier();
+			return createRowSerializer();
 		}
 
 		@Override
 		public Row createTestData() {
-			Row row = new Row(2);
-			row.setField(0, "flink");
+			Row row = new Row(4);
+			row.setField(0, null);
 			row.setField(1, 42L);
+			row.setField(2, "My string.");
+			row.setField(3, null);
 			return row;
 		}
 	}
@@ -94,20 +103,25 @@ public class RowSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Row,
 	public static final class RowSerializerVerifier implements TypeSerializerUpgradeTestBase.UpgradeVerifier<Row> {
 		@Override
 		public TypeSerializer<Row> createUpgradedSerializer() {
-			return stringLongRowSupplier();
+			return createRowSerializer();
 		}
 
 		@Override
 		public Matcher<Row> testDataMatcher() {
-			Row row = new Row(2);
-			row.setField(0, "flink");
+			Row row = new Row(RowKind.INSERT, 4);
+			row.setField(0, null);
 			row.setField(1, 42L);
+			row.setField(2, "My string.");
+			row.setField(3, null);
 			return is(row);
 		}
 
 		@Override
 		public Matcher<TypeSerializerSchemaCompatibility<Row>> schemaCompatibilityMatcher(MigrationVersion version) {
-			return TypeSerializerMatchers.isCompatibleAsIs();
+			if (version.isNewerVersionThan(MigrationVersion.v1_10)) {
+				return TypeSerializerMatchers.isCompatibleAsIs();
+			}
+			return TypeSerializerMatchers.isCompatibleAfterMigration();
 		}
 	}
 }
diff --git a/flink-core/src/test/resources/row-serializer-1.10/serializer-snapshot b/flink-core/src/test/resources/row-serializer-1.10/serializer-snapshot
new file mode 100644
index 0000000..0e01f99
Binary files /dev/null and b/flink-core/src/test/resources/row-serializer-1.10/serializer-snapshot differ
diff --git a/flink-core/src/test/resources/row-serializer-1.10/test-data b/flink-core/src/test/resources/row-serializer-1.10/test-data
new file mode 100644
index 0000000..c4b6589
Binary files /dev/null and b/flink-core/src/test/resources/row-serializer-1.10/test-data differ


[flink] 04/08: [hotfix][core] Fix warnings in TypeSerializerUpgradeTestBase

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 87653daaa416f229078d5695b3d601d1f052ff38
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 19 16:04:29 2020 +0200

    [hotfix][core] Fix warnings in TypeSerializerUpgradeTestBase
---
 .../typeutils/TypeSerializerUpgradeTestBase.java   | 194 ++++++++++++++-------
 .../runtime/PojoSerializerUpgradeTest.java         |   2 +-
 .../runtime/RowSerializerUpgradeTest.java          |   2 +-
 pom.xml                                            |   2 +
 4 files changed, 134 insertions(+), 66 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
index 04bc939..0a5ae1d 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
@@ -44,13 +44,13 @@ import static org.junit.Assume.assumeThat;
 
 /**
  * A test base for testing {@link TypeSerializer} upgrades.
- * <p>
- * You can run {@link #generateTestSetupFiles()} on a Flink branch to (re-)generate the test data
- * files.
+ *
+ * <p>You can run {@link #generateTestSetupFiles()} on a Flink branch to (re-)generate the test
+ * data files.
  */
 public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedElementT> extends TestLogger {
 
-	protected static final MigrationVersion[] migrationVersions = new MigrationVersion[]{
+	public static final MigrationVersion[] MIGRATION_VERSIONS = new MigrationVersion[]{
 			MigrationVersion.v1_7,
 			MigrationVersion.v1_8,
 			MigrationVersion.v1_9,
@@ -66,14 +66,45 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 	//  APIs
 	// ------------------------------------------------------------------------------
 
+	/**
+	 * Setup code for a {@link TestSpecification}. This creates the serializer before upgrade and
+	 * test data, that will be written by the created pre-upgrade {@link TypeSerializer}.
+	 */
 	public interface PreUpgradeSetup<PreviousElementT> {
+
+		/**
+		 * Creates a pre-upgrade {@link TypeSerializer}.
+		 */
 		TypeSerializer<PreviousElementT> createPriorSerializer();
+
+		/**
+		 * Creates test data that will be written using the pre-upgrade {@link TypeSerializer}.
+		 */
 		PreviousElementT createTestData();
 	}
 
+	/**
+	 * Verification code for a {@link TestSpecification}. This creates the "upgraded" {@link
+	 * TypeSerializer} and provides matchers for comparing the deserialized test data and for the
+	 * {@link TypeSerializerSchemaCompatibility}.
+	 */
 	public interface UpgradeVerifier<UpgradedElementT> {
+
+		/**
+		 * Creates a post-upgrade {@link TypeSerializer}.
+		 */
 		TypeSerializer<UpgradedElementT> createUpgradedSerializer();
+
+		/**
+		 * Returns a {@link Matcher} for asserting the deserialized test data.
+		 */
 		Matcher<UpgradedElementT> testDataMatcher();
+
+		/**
+		 * Returns a {@link Matcher} for comparing the {@link TypeSerializerSchemaCompatibility}
+		 * that the serializer upgrade produced with an expected {@link
+		 * TypeSerializerSchemaCompatibility}.
+		 */
 		Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher();
 	}
 
@@ -85,7 +116,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 		ClassLoaderSafePreUpgradeSetup(Class<? extends PreUpgradeSetup<PreviousElementT>> delegateSetupClass) throws Exception {
 			checkNotNull(delegateSetupClass);
 			Class<? extends PreUpgradeSetup<PreviousElementT>> relocatedDelegateSetupClass =
-				ClassRelocator.relocate(delegateSetupClass);
+					ClassRelocator.relocate(delegateSetupClass);
 
 			this.setupClassloader = relocatedDelegateSetupClass.getClassLoader();
 			try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(setupClassloader)) {
@@ -98,7 +129,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(setupClassloader)) {
 				return delegateSetup.createPriorSerializer();
 			} catch (IOException e) {
-				throw new RuntimeException("Error creating prior serializer via ClassLoaderSafePreUpgradeSetup.", e);
+				throw new RuntimeException(
+						"Error creating prior serializer via ClassLoaderSafePreUpgradeSetup.",
+						e);
 			}
 		}
 
@@ -107,7 +140,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(setupClassloader)) {
 				return delegateSetup.createTestData();
 			} catch (IOException e) {
-				throw new RuntimeException("Error creating test data via ThreadContextClassLoader.", e);
+				throw new RuntimeException(
+						"Error creating test data via ThreadContextClassLoader.",
+						e);
 			}
 		}
 	}
@@ -120,7 +155,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 		ClassLoaderSafeUpgradeVerifier(Class<? extends UpgradeVerifier<UpgradedElementT>> delegateVerifierClass) throws Exception {
 			checkNotNull(delegateVerifierClass);
 			Class<? extends UpgradeVerifier<UpgradedElementT>> relocatedDelegateVerifierClass =
-				ClassRelocator.relocate(delegateVerifierClass);
+					ClassRelocator.relocate(delegateVerifierClass);
 
 			this.verifierClassloader = relocatedDelegateVerifierClass.getClassLoader();
 			try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) {
@@ -133,7 +168,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) {
 				return delegateVerifier.createUpgradedSerializer();
 			} catch (IOException e) {
-				throw new RuntimeException("Error creating upgraded serializer via ClassLoaderSafeUpgradeVerifier.", e);
+				throw new RuntimeException(
+						"Error creating upgraded serializer via ClassLoaderSafeUpgradeVerifier.",
+						e);
 			}
 		}
 
@@ -142,7 +179,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) {
 				return delegateVerifier.testDataMatcher();
 			} catch (IOException e) {
-				throw new RuntimeException("Error creating expected test data via ClassLoaderSafeUpgradeVerifier.", e);
+				throw new RuntimeException(
+						"Error creating expected test data via ClassLoaderSafeUpgradeVerifier.",
+						e);
 			}
 		}
 
@@ -151,11 +190,17 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) {
 				return delegateVerifier.schemaCompatibilityMatcher();
 			} catch (IOException e) {
-				throw new RuntimeException("Error creating schema compatibility matcher via ClassLoaderSafeUpgradeVerifier.", e);
+				throw new RuntimeException(
+						"Error creating schema compatibility matcher via ClassLoaderSafeUpgradeVerifier.",
+						e);
 			}
 		}
 	}
 
+	/**
+	 * Specification of one test scenario. This mainly needs a {@link PreUpgradeSetup} and {@link
+	 * UpgradeVerifier}.
+	 */
 	public static class TestSpecification<PreviousElementT, UpgradedElementT> {
 		private final String name;
 		private final MigrationVersion migrationVersion;
@@ -186,10 +231,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 	private static final int INITIAL_OUTPUT_BUFFER_SIZE = 64;
 
 	/**
-	 * Execute this test to generate test files.
-	 * Remember to be using the correct branch when generating the test files,
-	 * e.g. to generate test files for {@link MigrationVersion#v1_8}, you should be under the
-	 * release-1.8 branch.
+	 * Execute this test to generate test files. Remember to be using the correct branch when
+	 * generating the test files, e.g. to generate test files for {@link MigrationVersion#v1_8}, you
+	 * should be under the release-1.8 branch.
 	 */
 	@Ignore
 	@Test
@@ -207,9 +251,15 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			writeContentsTo(getTestDataFilePath(), testDataOut.getCopyOfBuffer());
 
 			// ... then write the serializer snapshot
-			DataOutputSerializer serializerSnapshotOut = new DataOutputSerializer(INITIAL_OUTPUT_BUFFER_SIZE);
-			writeSerializerSnapshot(serializerSnapshotOut, priorSerializer, testSpecification.migrationVersion);
-			writeContentsTo(getSerializerSnapshotFilePath(), serializerSnapshotOut.getCopyOfBuffer());
+			DataOutputSerializer serializerSnapshotOut = new DataOutputSerializer(
+					INITIAL_OUTPUT_BUFFER_SIZE);
+			writeSerializerSnapshot(
+					serializerSnapshotOut,
+					priorSerializer,
+					testSpecification.migrationVersion);
+			writeContentsTo(
+					getSerializerSnapshotFilePath(),
+					serializerSnapshotOut.getCopyOfBuffer());
 		}
 	}
 
@@ -221,17 +271,17 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 	public void restoreSerializerIsValid() throws Exception {
 		try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(testSpecification.verifier.verifierClassloader)) {
 			assumeThat(
-				"This test only applies for test specifications that verify an upgraded serializer that is not incompatible.",
-				TypeSerializerSchemaCompatibility.incompatible(),
-				not(testSpecification.verifier.schemaCompatibilityMatcher()));
+					"This test only applies for test specifications that verify an upgraded serializer that is not incompatible.",
+					TypeSerializerSchemaCompatibility.incompatible(),
+					not(testSpecification.verifier.schemaCompatibilityMatcher()));
 
 			TypeSerializerSnapshot<UpgradedElementT> restoredSerializerSnapshot = snapshotUnderTest();
 
 			TypeSerializer<UpgradedElementT> restoredSerializer = restoredSerializerSnapshot.restoreSerializer();
 			assertSerializerIsValid(
-				restoredSerializer,
-				dataUnderTest(),
-				testSpecification.verifier.testDataMatcher());
+					restoredSerializer,
+					dataUnderTest(),
+					testSpecification.verifier.testDataMatcher());
 		}
 	}
 
@@ -242,9 +292,11 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			TypeSerializer<UpgradedElementT> upgradedSerializer = testSpecification.verifier.createUpgradedSerializer();
 
 			TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility =
-				restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
+					restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
 
-			assertThat(upgradeCompatibility, testSpecification.verifier.schemaCompatibilityMatcher());
+			assertThat(
+					upgradeCompatibility,
+					testSpecification.verifier.schemaCompatibilityMatcher());
 		}
 	}
 
@@ -256,22 +308,25 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			TypeSerializer<UpgradedElementT> upgradedSerializer = testSpecification.verifier.createUpgradedSerializer();
 
 			TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility =
-				restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
+					restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
 			assumeThat(
-				"This test only applies for test specifications that verify an upgraded serializer that requires migration to be compatible.",
-				upgradeCompatibility,
-				TypeSerializerMatchers.isCompatibleAfterMigration());
+					"This test only applies for test specifications that verify an upgraded serializer that requires migration to be compatible.",
+					upgradeCompatibility,
+					TypeSerializerMatchers.isCompatibleAfterMigration());
 
 			// migrate the previous data schema,
 			TypeSerializer<UpgradedElementT> restoreSerializer = restoredSerializerSnapshot.restoreSerializer();
 			DataInputView migratedData = readAndThenWriteData(
-				dataUnderTest(),
-				restoreSerializer,
-				upgradedSerializer,
-				testSpecification.verifier.testDataMatcher());
+					dataUnderTest(),
+					restoreSerializer,
+					upgradedSerializer,
+					testSpecification.verifier.testDataMatcher());
 
 			// .. and then assert that the upgraded serializer is valid with the migrated data
-			assertSerializerIsValid(upgradedSerializer, migratedData, testSpecification.verifier.testDataMatcher());
+			assertSerializerIsValid(
+					upgradedSerializer,
+					migratedData,
+					testSpecification.verifier.testDataMatcher());
 		}
 	}
 
@@ -282,17 +337,17 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			TypeSerializer<UpgradedElementT> upgradedSerializer = testSpecification.verifier.createUpgradedSerializer();
 
 			TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility =
-				restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
+					restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
 			assumeThat(
-				"This test only applies for test specifications that verify an upgraded serializer that requires reconfiguration to be compatible.",
-				upgradeCompatibility,
-				TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer());
+					"This test only applies for test specifications that verify an upgraded serializer that requires reconfiguration to be compatible.",
+					upgradeCompatibility,
+					TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer());
 
 			TypeSerializer<UpgradedElementT> reconfiguredUpgradedSerializer = upgradeCompatibility.getReconfiguredSerializer();
 			assertSerializerIsValid(
-				reconfiguredUpgradedSerializer,
-				dataUnderTest(),
-				testSpecification.verifier.testDataMatcher());
+					reconfiguredUpgradedSerializer,
+					dataUnderTest(),
+					testSpecification.verifier.testDataMatcher());
 		}
 	}
 
@@ -303,21 +358,22 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			TypeSerializer<UpgradedElementT> upgradedSerializer = testSpecification.verifier.createUpgradedSerializer();
 
 			TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility =
-				restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
+					restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
 			assumeThat(
-				"This test only applies for test specifications that verify an upgraded serializer that is compatible as is.",
-				upgradeCompatibility,
-				TypeSerializerMatchers.isCompatibleAsIs());
+					"This test only applies for test specifications that verify an upgraded serializer that is compatible as is.",
+					upgradeCompatibility,
+					TypeSerializerMatchers.isCompatibleAsIs());
 
 			assertSerializerIsValid(
-				upgradedSerializer,
-				dataUnderTest(),
-				testSpecification.verifier.testDataMatcher());
+					upgradedSerializer,
+					dataUnderTest(),
+					testSpecification.verifier.testDataMatcher());
 		}
 	}
 
 	/**
-	 * Asserts that a given {@link TypeSerializer} is valid, given a {@link DataInputView} of serialized data.
+	 * Asserts that a given {@link TypeSerializer} is valid, given a {@link DataInputView} of
+	 * serialized data.
 	 *
 	 * <p>A serializer is valid, iff:
 	 * <ul>
@@ -333,7 +389,11 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			DataInputView dataInput,
 			Matcher<T> testDataMatcher) throws Exception {
 
-		DataInputView serializedData = readAndThenWriteData(dataInput, serializer, serializer, testDataMatcher);
+		DataInputView serializedData = readAndThenWriteData(
+				dataInput,
+				serializer,
+				serializer,
+				testDataMatcher);
 		TypeSerializerSnapshot<T> snapshot = writeAndThenReadSerializerSnapshot(serializer);
 		TypeSerializer<T> restoreSerializer = snapshot.restoreSerializer();
 		readAndThenWriteData(serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
@@ -357,19 +417,18 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 
 	private TypeSerializerSnapshot<UpgradedElementT> snapshotUnderTest() throws Exception {
 		return readSerializerSnapshot(
-			contentsOf(getSerializerSnapshotFilePath()),
-			testSpecification.migrationVersion);
+				contentsOf(getSerializerSnapshotFilePath()),
+				testSpecification.migrationVersion);
 	}
 
-	private DataInputView dataUnderTest() throws IOException {
+	private DataInputView dataUnderTest() {
 		return contentsOf(getTestDataFilePath());
 	}
 
 	private static void writeContentsTo(Path path, byte[] bytes) {
 		try {
 			Files.write(path, bytes);
-		}
-		catch (IOException e) {
+		} catch (IOException e) {
 			throw new RuntimeException("Failed to write to " + path, e);
 		}
 	}
@@ -378,8 +437,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 		try {
 			byte[] bytes = Files.readAllBytes(path);
 			return new DataInputDeserializer(bytes);
-		}
-		catch (IOException e) {
+		} catch (IOException e) {
 			throw new RuntimeException("Failed to read contents of " + path, e);
 		}
 	}
@@ -401,15 +459,19 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			TypeSerializer<T> serializer) throws IOException {
 
 		TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
-			out, serializer.snapshotConfiguration(), serializer);
+				out, serializer.snapshotConfiguration(), serializer);
 	}
 
+	@SuppressWarnings("deprecation")
 	private static <T> void writeSerializerSnapshotPre17Format(
 			DataOutputView out,
 			TypeSerializer<T> serializer) throws IOException {
 
 		TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
-			out, Collections.singletonList(Tuple2.of(serializer, serializer.snapshotConfiguration())));
+				out,
+				Collections.singletonList(Tuple2.of(
+						serializer,
+						serializer.snapshotConfiguration())));
 	}
 
 	private static <T> TypeSerializerSnapshot<T> readSerializerSnapshot(
@@ -429,16 +491,18 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			ClassLoader userCodeClassLoader) throws IOException {
 
 		return TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
-			in, userCodeClassLoader, null);
+				in, userCodeClassLoader, null);
 	}
 
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({"unchecked", "deprecation"})
 	private static <T> TypeSerializerSnapshot<T> readSerializerSnapshotPre17Format(
 			DataInputView in,
 			ClassLoader userCodeClassLoader) throws IOException {
 
 		List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializerSnapshotPair =
-			TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader);
+				TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
+						in,
+						userCodeClassLoader);
 		return (TypeSerializerSnapshot<T>) serializerSnapshotPair.get(0).f1;
 	}
 
@@ -463,6 +527,8 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 		writeSerializerSnapshotCurrentFormat(out, serializer);
 
 		DataInputDeserializer in = new DataInputDeserializer(out.wrapAsByteBuffer());
-		return readSerializerSnapshotCurrentFormat(in, Thread.currentThread().getContextClassLoader());
+		return readSerializerSnapshotCurrentFormat(
+				in,
+				Thread.currentThread().getContextClassLoader());
 	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTest.java
index f642afe..fac1bde 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTest.java
@@ -40,7 +40,7 @@ public class PojoSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Obj
 	@Parameterized.Parameters(name = "Test Specification = {0}")
 	public static Collection<TestSpecification<?, ?>> testSpecifications() throws Exception {
 		ArrayList<TestSpecification<?, ?>> testSpecifications = new ArrayList<>();
-		for (MigrationVersion migrationVersion : migrationVersions) {
+		for (MigrationVersion migrationVersion : MIGRATION_VERSIONS) {
 			testSpecifications.add(
 				new TestSpecification<>(
 					"pojo-serializer-identical-schema",
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
index 0c9a47b..602f92e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
@@ -50,7 +50,7 @@ public class RowSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Row,
 	@Parameterized.Parameters(name = "Test Specification = {0}")
 	public static Collection<TestSpecification<?, ?>> testSpecifications() throws Exception {
 		ArrayList<TestSpecification<?, ?>> testSpecifications = new ArrayList<>();
-		for (MigrationVersion migrationVersion : migrationVersions) {
+		for (MigrationVersion migrationVersion : MIGRATION_VERSIONS) {
 			testSpecifications.add(
 				new TestSpecification<>(
 					"row-serializer",
diff --git a/pom.xml b/pom.xml
index e72adf7..136738e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1456,6 +1456,8 @@ under the License.
 						<exclude>flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/query/*</exclude>
 
 						<!-- snapshots -->
+						<exclude>**/src/test/resources/**/serializer-snapshot</exclude>
+						<exclude>**/src/test/resources/**/test-data</exclude>
 						<exclude>**/src/test/resources/*-snapshot</exclude>
 						<exclude>**/src/test/resources/*.snapshot</exclude>
 						<exclude>**/src/test/resources/*-savepoint</exclude>


[flink] 06/08: [hotfix][core] Add open interval in MigrationVersion

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2d6854212a85339a46ccef874c340a5e3e67bec
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 19 15:52:40 2020 +0200

    [hotfix][core] Add open interval in MigrationVersion
---
 .../apache/flink/testutils/migration/MigrationVersion.java  | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
index 5f964ec..4a9cb7d 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.testutils.migration;
 
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 /**
  * Enumeration for Flink versions, used in migration integration tests
  * to indicate the migrated snapshot version.
@@ -50,4 +54,13 @@ public enum MigrationVersion {
 	public boolean isNewerVersionThan(MigrationVersion otherVersion) {
 		return this.ordinal() > otherVersion.ordinal();
 	}
+
+	/**
+	 * Returns all versions equal to or higher than the selected version.
+	 */
+	public List<MigrationVersion> orHigher() {
+		return Stream.of(MigrationVersion.values())
+			.filter(v -> this.ordinal() <= v.ordinal())
+			.collect(Collectors.toList());
+	}
 }


[flink] 01/08: [hotfix] Fix directory creation in TypeSerializerUpgradeTestBase

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d9d7c79fbd123dc80a1939eafe05bb3bc9e11b8a
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Mon Jan 20 14:01:07 2020 +0100

    [hotfix] Fix directory creation in TypeSerializerUpgradeTestBase
---
 .../flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
index a355837..92341cb 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
@@ -195,7 +195,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 	@Ignore
 	@Test
 	public void generateTestSetupFiles() throws Exception {
-		Files.createDirectory(getSerializerSnapshotFilePath().getParent());
+		Files.createDirectories(getSerializerSnapshotFilePath().getParent());
 
 		try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(testSpecification.setup.setupClassloader)) {
 			TypeSerializer<PreviousElementT> priorSerializer = testSpecification.setup.createPriorSerializer();


[flink] 05/08: [hotfix][core] Fix migration version comparision for 1.10

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4dd729bc6946ca32a62f822bad551f393e077a90
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 19 17:18:25 2020 +0200

    [hotfix][core] Fix migration version comparision for 1.10
---
 .../java/org/apache/flink/testutils/migration/MigrationVersion.java  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
index 5a6260a..5f964ec 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
@@ -25,7 +25,8 @@ package org.apache.flink.testutils.migration;
 public enum MigrationVersion {
 
 	// NOTE: the version strings must not change,
-	// as they are used to locate snapshot file paths
+	// as they are used to locate snapshot file paths.
+	// The definition order matters for performing version arithmetic.
 	v1_3("1.3"),
 	v1_4("1.4"),
 	v1_5("1.5"),
@@ -47,6 +48,6 @@ public enum MigrationVersion {
 	}
 
 	public boolean isNewerVersionThan(MigrationVersion otherVersion) {
-		return Double.valueOf(versionStr) > Double.valueOf(otherVersion.versionStr);
+		return this.ordinal() > otherVersion.ordinal();
 	}
 }


[flink] 07/08: [hotfix][core] Fix TypeSerializerUpgradeTestBase for serializer migration

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3f79ecc87d9f584bd03963c89c32e29a7b1ae379
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 20 12:11:09 2020 +0200

    [hotfix][core] Fix TypeSerializerUpgradeTestBase for serializer migration
---
 .../typeutils/TypeSerializerUpgradeTestBase.java   | 29 ++++++++++++++--------
 .../PojoSerializerUpgradeTestSpecifications.java   | 21 ++++++++--------
 .../runtime/RowSerializerUpgradeTest.java          |  2 +-
 3 files changed, 31 insertions(+), 21 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
index 0a5ae1d..fe94ee9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
@@ -105,7 +105,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 		 * that the serializer upgrade produced with an expected {@link
 		 * TypeSerializerSchemaCompatibility}.
 		 */
-		Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher();
+		Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher(MigrationVersion version);
 	}
 
 	private static class ClassLoaderSafePreUpgradeSetup<PreviousElementT> implements PreUpgradeSetup<PreviousElementT> {
@@ -186,9 +186,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher(MigrationVersion version) {
 			try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(verifierClassloader)) {
-				return delegateVerifier.schemaCompatibilityMatcher();
+				return delegateVerifier.schemaCompatibilityMatcher(version);
 			} catch (IOException e) {
 				throw new RuntimeException(
 						"Error creating schema compatibility matcher via ClassLoaderSafeUpgradeVerifier.",
@@ -273,13 +273,14 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			assumeThat(
 					"This test only applies for test specifications that verify an upgraded serializer that is not incompatible.",
 					TypeSerializerSchemaCompatibility.incompatible(),
-					not(testSpecification.verifier.schemaCompatibilityMatcher()));
+					not(testSpecification.verifier.schemaCompatibilityMatcher(testSpecification.migrationVersion)));
 
 			TypeSerializerSnapshot<UpgradedElementT> restoredSerializerSnapshot = snapshotUnderTest();
 
 			TypeSerializer<UpgradedElementT> restoredSerializer = restoredSerializerSnapshot.restoreSerializer();
 			assertSerializerIsValid(
 					restoredSerializer,
+					true,
 					dataUnderTest(),
 					testSpecification.verifier.testDataMatcher());
 		}
@@ -296,7 +297,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 
 			assertThat(
 					upgradeCompatibility,
-					testSpecification.verifier.schemaCompatibilityMatcher());
+					testSpecification.verifier.schemaCompatibilityMatcher(testSpecification.migrationVersion));
 		}
 	}
 
@@ -325,6 +326,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			// .. and then assert that the upgraded serializer is valid with the migrated data
 			assertSerializerIsValid(
 					upgradedSerializer,
+					false,
 					migratedData,
 					testSpecification.verifier.testDataMatcher());
 		}
@@ -346,6 +348,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 			TypeSerializer<UpgradedElementT> reconfiguredUpgradedSerializer = upgradeCompatibility.getReconfiguredSerializer();
 			assertSerializerIsValid(
 					reconfiguredUpgradedSerializer,
+					false,
 					dataUnderTest(),
 					testSpecification.verifier.testDataMatcher());
 		}
@@ -366,6 +369,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 
 			assertSerializerIsValid(
 					upgradedSerializer,
+					false,
 					dataUnderTest(),
 					testSpecification.verifier.testDataMatcher());
 		}
@@ -378,14 +382,17 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 	 * <p>A serializer is valid, iff:
 	 * <ul>
 	 *     <li>1. The serializer can read and then write again the given serialized data.
-	 *     <li>2. The serializer can produce a serializer snapshot which can be written and then read back again.
+	 *     <li>2. The serializer can produce a serializer snapshot which can be written and then read
+	 *            back again.
 	 *     <li>3. The serializer's produced snapshot is capable of creating a restore serializer.
 	 *     <li>4. The restore serializer created from the serializer snapshot can read and then
-	 *            write again data written by step 1.
+	 *            write again data written by step 1. Given that the serializer is not a restore
+	 *            serializer already.
 	 * </ul>
 	 */
 	private static <T> void assertSerializerIsValid(
 			TypeSerializer<T> serializer,
+			boolean isRestoreSerializer,
 			DataInputView dataInput,
 			Matcher<T> testDataMatcher) throws Exception {
 
@@ -394,9 +401,11 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
 				serializer,
 				serializer,
 				testDataMatcher);
-		TypeSerializerSnapshot<T> snapshot = writeAndThenReadSerializerSnapshot(serializer);
-		TypeSerializer<T> restoreSerializer = snapshot.restoreSerializer();
-		readAndThenWriteData(serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
+		if (!isRestoreSerializer) {
+			TypeSerializerSnapshot<T> snapshot = writeAndThenReadSerializerSnapshot(serializer);
+			TypeSerializer<T> restoreSerializer = snapshot.restoreSerializer();
+			readAndThenWriteData(serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
+		}
 	}
 
 	// ------------------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
index 4626924..f142401 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import org.hamcrest.Matcher;
 
@@ -228,7 +229,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleAsIs();
 		}
 	}
@@ -330,7 +331,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<PojoAfterSchemaUpgrade>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<PojoAfterSchemaUpgrade>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleAfterMigration();
 		}
 	}
@@ -395,7 +396,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<PojoWithStringField>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<PojoWithStringField>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isIncompatible();
 		}
 	}
@@ -519,7 +520,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<BasePojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<BasePojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleAfterMigration();
 		}
 	}
@@ -592,7 +593,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isIncompatible();
 		}
 	}
@@ -631,7 +632,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
 		}
 	}
@@ -680,7 +681,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
 		}
 	}
@@ -727,7 +728,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
 		}
 	}
@@ -772,7 +773,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
 		}
 	}
@@ -819,7 +820,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<StaticSchemaPojo>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer();
 		}
 	}
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
index 602f92e..cba9789 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
@@ -106,7 +106,7 @@ public class RowSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Row,
 		}
 
 		@Override
-		public Matcher<TypeSerializerSchemaCompatibility<Row>> schemaCompatibilityMatcher() {
+		public Matcher<TypeSerializerSchemaCompatibility<Row>> schemaCompatibilityMatcher(MigrationVersion version) {
 			return TypeSerializerMatchers.isCompatibleAsIs();
 		}
 	}


[flink] 03/08: [FLINK-13632] Port RowSerializer upgrade test to TypeSerializerUpgradeTestBase

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 44cc684df27089d4d40632b357ddaa1d1bca21ce
Author: klion26 <qc...@gmail.com>
AuthorDate: Sat Feb 15 14:32:35 2020 +0800

    [FLINK-13632] Port RowSerializer upgrade test to TypeSerializerUpgradeTestBase
---
 .../runtime/RowSerializerMigrationTest.java        |  66 ------------
 .../runtime/RowSerializerUpgradeTest.java          | 113 +++++++++++++++++++++
 .../test/resources/flink-1.6-row-serializer-data   | Bin 240 -> 0 bytes
 .../resources/flink-1.6-row-serializer-snapshot    | Bin 1444 -> 0 bytes
 .../test/resources/flink-1.7-row-serializer-data   | Bin 240 -> 0 bytes
 .../resources/flink-1.7-row-serializer-snapshot    | Bin 1454 -> 0 bytes
 6 files changed, 113 insertions(+), 66 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java
deleted file mode 100644
index 1aacd4f..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.RowSerializer.RowSerializerSnapshot;
-import org.apache.flink.testutils.migration.MigrationVersion;
-import org.apache.flink.types.Row;
-
-import org.junit.Ignore;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Collection;
-
-/**
- * State migration test for {@link RowSerializer}.
- */
-@Ignore
-@RunWith(Parameterized.class)
-public class RowSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Row> {
-
-	public RowSerializerMigrationTest(TestSpecification<Row> testSpecification) {
-		super(testSpecification);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Parameterized.Parameters(name = "Test Specification = {0}")
-	public static Collection<TestSpecification<?>> testSpecifications() {
-
-		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
-
-		testSpecifications.add(
-			"row-serializer",
-			RowSerializer.class,
-			RowSerializerSnapshot.class,
-			RowSerializerMigrationTest::stringLongRowSupplier);
-
-		return testSpecifications.get();
-	}
-
-	private static TypeSerializer<Row> stringLongRowSupplier() {
-		RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
-		return rowTypeInfo.createSerializer(new ExecutionConfig());
-	}
-}
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
new file mode 100644
index 0000000..0c9a47b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.testutils.migration.MigrationVersion;
+import org.apache.flink.types.Row;
+
+import org.hamcrest.Matcher;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.hamcrest.Matchers.is;
+
+/**
+ * A {@link TypeSerializerUpgradeTestBase} for {@link RowSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class RowSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Row, Row> {
+
+	public RowSerializerUpgradeTest(TestSpecification<Row, Row> testSpecification) {
+		super(testSpecification);
+	}
+
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?, ?>> testSpecifications() throws Exception {
+		ArrayList<TestSpecification<?, ?>> testSpecifications = new ArrayList<>();
+		for (MigrationVersion migrationVersion : migrationVersions) {
+			testSpecifications.add(
+				new TestSpecification<>(
+					"row-serializer",
+					migrationVersion,
+					RowSerializerSetup.class,
+					RowSerializerVerifier.class));
+		}
+		return testSpecifications;
+	}
+
+	public static TypeSerializer<Row> stringLongRowSupplier() {
+		RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
+		return rowTypeInfo.createSerializer(new ExecutionConfig());
+	}
+
+	// ----------------------------------------------------------------------------------------------
+	//  Specification for "row-serializer"
+	// ----------------------------------------------------------------------------------------------
+
+	/**
+	 * This class is only public to work with {@link org.apache.flink.api.common.typeutils.ClassRelocator}.
+	 */
+	public static final class RowSerializerSetup implements TypeSerializerUpgradeTestBase.PreUpgradeSetup<Row> {
+		@Override
+		public TypeSerializer<Row> createPriorSerializer() {
+			return stringLongRowSupplier();
+		}
+
+		@Override
+		public Row createTestData() {
+			Row row = new Row(2);
+			row.setField(0, "flink");
+			row.setField(1, 42L);
+			return row;
+		}
+	}
+
+	/**
+	 * This class is only public to work with {@link org.apache.flink.api.common.typeutils.ClassRelocator}.
+	 */
+	public static final class RowSerializerVerifier implements TypeSerializerUpgradeTestBase.UpgradeVerifier<Row> {
+		@Override
+		public TypeSerializer<Row> createUpgradedSerializer() {
+			return stringLongRowSupplier();
+		}
+
+		@Override
+		public Matcher<Row> testDataMatcher() {
+			Row row = new Row(2);
+			row.setField(0, "flink");
+			row.setField(1, 42L);
+			return is(row);
+		}
+
+		@Override
+		public Matcher<TypeSerializerSchemaCompatibility<Row>> schemaCompatibilityMatcher() {
+			return TypeSerializerMatchers.isCompatibleAsIs();
+		}
+	}
+}
diff --git a/flink-core/src/test/resources/flink-1.6-row-serializer-data b/flink-core/src/test/resources/flink-1.6-row-serializer-data
deleted file mode 100644
index 2cd0bd3..0000000
Binary files a/flink-core/src/test/resources/flink-1.6-row-serializer-data and /dev/null differ
diff --git a/flink-core/src/test/resources/flink-1.6-row-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-row-serializer-snapshot
deleted file mode 100644
index 9200e23..0000000
Binary files a/flink-core/src/test/resources/flink-1.6-row-serializer-snapshot and /dev/null differ
diff --git a/flink-core/src/test/resources/flink-1.7-row-serializer-data b/flink-core/src/test/resources/flink-1.7-row-serializer-data
deleted file mode 100644
index 41aa078..0000000
Binary files a/flink-core/src/test/resources/flink-1.7-row-serializer-data and /dev/null differ
diff --git a/flink-core/src/test/resources/flink-1.7-row-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-row-serializer-snapshot
deleted file mode 100644
index f632dc4..0000000
Binary files a/flink-core/src/test/resources/flink-1.7-row-serializer-snapshot and /dev/null differ