You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/17 11:53:55 UTC

[flink] 04/06: [FLINK-10778] [tests] Make new serializer compatibility tests more flexible in TypeSerializerSnapshotMigrationTestBase

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6124db0a3edf13c97e4559469e72e1729d849b1
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 15 15:19:55 2019 +0100

    [FLINK-10778] [tests] Make new serializer compatibility tests more flexible in TypeSerializerSnapshotMigrationTestBase
    
    Before, the TypeSerializerSnapshotMigrationTestBase test base always
    only asserted that a new serializer is compatible as is with the
    previous serializer's snapshot.
    
    This makes the test code not resusable for cases where the new
    serializer provided isn't compatible as is, but other compatibility
    types, for example Kryo serializers that requires reconfiguration on
    restore. This allows the test base to express those cases.
---
 ...mpositeTypeSerializerSnapshotMigrationTest.java |  4 +-
 .../TypeSerializerSnapshotMigrationTestBase.java   | 45 +++++++++++++++++--
 .../BaseTypeSerializerSnapshotMigrationTest.java   | 50 +++++++++++-----------
 .../base/ListSerializerSnapshotMigrationTest.java  |  2 +-
 .../base/MapSerializerSnapshotMigrationTest.java   |  2 +-
 ...mitiveArraySerializerSnapshotMigrationTest.java | 18 ++++----
 .../typeutils/AvroSerializerMigrationTest.java     |  4 +-
 ...ockableTypeSerializerSnapshotMigrationTest.java |  2 +-
 .../ListViewSerializerSnapshotMigrationTest.java   |  2 +-
 .../MapViewSerializerSnapshotMigrationTest.java    |  2 +-
 .../state/ArrayListSerializerMigrationTest.java    |  2 +-
 ...ScalaEitherSerializerSnapshotMigrationTest.java |  2 +-
 12 files changed, 87 insertions(+), 48 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
index 8a451e1..c8833b6 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
@@ -54,7 +54,7 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer
 				EitherSerializer.class,
 				JavaEitherSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE))
+			.withNewSerializerProvider(() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE))
 			.withSnapshotDataLocation("flink-1.6-either-type-serializer-snapshot")
 			.withTestData("flink-1.6-either-type-serializer-data", 10);
 
@@ -65,7 +65,7 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer
 				GenericArraySerializer.class,
 				GenericArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
+			.withNewSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
 			.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
 			.withTestData("flink-1.6-array-type-serializer-data", 10);
 
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
index 55554be..6aa80b8 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
@@ -25,6 +25,10 @@ import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -42,7 +46,6 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertTrue;
 
 /**
  * A test base for verifying {@link TypeSerializerSnapshot} migration.
@@ -82,7 +85,7 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 
 		TypeSerializerSchemaCompatibility<ElementT> result = snapshot.resolveSchemaCompatibility(testSpecification.createSerializer());
 
-		assertTrue(result.isCompatibleAsIs());
+		assertThat(result, hasSameCompatibilityType(testSpecification.expectedCompatibilityResult));
 	}
 
 	@Test
@@ -201,6 +204,7 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		private final String name;
 		private final MigrationVersion testMigrationVersion;
 		private Supplier<? extends TypeSerializer<T>> serializerProvider;
+		private TypeSerializerSchemaCompatibility<T> expectedCompatibilityResult;
 		private String snapshotDataLocation;
 		private String testDataLocation;
 		private int testDataCount;
@@ -231,8 +235,15 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 			this.testMigrationVersion = testMigrationVersion;
 		}
 
-		public TestSpecification<T> withSerializerProvider(Supplier<? extends TypeSerializer<T>> serializerProvider) {
+		public TestSpecification<T> withNewSerializerProvider(Supplier<? extends TypeSerializer<T>> serializerProvider) {
+			return withNewSerializerProvider(serializerProvider, TypeSerializerSchemaCompatibility.compatibleAsIs());
+		}
+
+		public TestSpecification<T> withNewSerializerProvider(
+				Supplier<? extends TypeSerializer<T>> serializerProvider,
+				TypeSerializerSchemaCompatibility<T> expectedCompatibilityResult) {
 			this.serializerProvider = serializerProvider;
+			this.expectedCompatibilityResult = expectedCompatibilityResult;
 			return this;
 		}
 
@@ -277,4 +288,32 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 			return String.format("%s , %s, %s", name, serializerType.getSimpleName(), snapshotClass.getSimpleName());
 		}
 	}
+
+	// --------------------------------------------------------------------------------------------------------------
+	// Utilities
+	// --------------------------------------------------------------------------------------------------------------
+
+	private <T> Matcher<TypeSerializerSchemaCompatibility<T>> hasSameCompatibilityType(TypeSerializerSchemaCompatibility<T> expectedCompatibilty) {
+		return new TypeSafeMatcher<TypeSerializerSchemaCompatibility<T>>() {
+
+			@Override
+			protected boolean matchesSafely(TypeSerializerSchemaCompatibility<T> testResultCompatibility) {
+				if (expectedCompatibilty.isCompatibleAsIs()) {
+					return testResultCompatibility.isCompatibleAsIs();
+				} else if (expectedCompatibilty.isIncompatible()) {
+					return testResultCompatibility.isCompatibleAfterMigration();
+				} else if (expectedCompatibilty.isIncompatible()) {
+					return testResultCompatibility.isIncompatible();
+				} else if (expectedCompatibilty.isCompatibleWithReconfiguredSerializer()) {
+					return testResultCompatibility.isCompatibleWithReconfiguredSerializer();
+				}
+				return false;
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("same compatibility as ").appendValue(expectedCompatibilty);
+			}
+		};
+	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
index bd7ac6c..93725c4 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
@@ -64,7 +64,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				BigDecSerializer.class,
 				BigDecSerializer.BigDecSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> BigDecSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> BigDecSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-big-dec-serializer-snapshot")
 			.withTestData("flink-1.6-big-dec-serializer-data", 10);
 
@@ -75,7 +75,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				BigIntSerializer.class,
 				BigIntSerializer.BigIntSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> BigIntSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> BigIntSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-big-int-serializer-snapshot")
 			.withTestData("flink-1.6-big-int-serializer-data", 10);
 
@@ -86,7 +86,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				BooleanSerializer.class,
 				BooleanSerializer.BooleanSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> BooleanSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> BooleanSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-boolean-serializer-snapshot")
 			.withTestData("flink-1.6-boolean-serializer-data", 10);
 
@@ -97,7 +97,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				BooleanValueSerializer.class,
 				BooleanValueSerializer.BooleanValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> BooleanValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> BooleanValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-boolean-value-serializer-snapshot")
 			.withTestData("flink-1.6-boolean-value-serializer-data", 10);
 
@@ -108,7 +108,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				ByteSerializer.class,
 				ByteSerializer.ByteSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> ByteSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> ByteSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-byte-serializer-snapshot")
 			.withTestData("flink-1.6-byte-serializer-data", 10);
 
@@ -119,7 +119,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				ByteValueSerializer.class,
 				ByteValueSerializer.ByteValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> ByteValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> ByteValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-byte-value-serializer-snapshot")
 			.withTestData("flink-1.6-byte-value-serializer-data", 10);
 
@@ -130,7 +130,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				CharSerializer.class,
 				CharSerializer.CharSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> CharSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> CharSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-char-serializer-snapshot")
 			.withTestData("flink-1.6-char-serializer-data", 10);
 
@@ -141,7 +141,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				CharValueSerializer.class,
 				CharValueSerializer.CharValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> CharValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> CharValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-char-value-serializer-snapshot")
 			.withTestData("flink-1.6-char-value-serializer-data", 10);
 
@@ -152,7 +152,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				DateSerializer.class,
 				DateSerializer.DateSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> DateSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> DateSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-date-serializer-snapshot")
 			.withTestData("flink-1.6-date-serializer-data", 10);
 
@@ -163,7 +163,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				DoubleSerializer.class,
 				DoubleSerializer.DoubleSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> DoubleSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> DoubleSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-double-serializer-snapshot")
 			.withTestData("flink-1.6-double-serializer-data", 10);
 
@@ -174,7 +174,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				DoubleValueSerializer.class,
 				DoubleValueSerializer.DoubleValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> DoubleValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> DoubleValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-double-value-serializer-snapshot")
 			.withTestData("flink-1.6-double-value-serializer-data", 10);
 
@@ -185,7 +185,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				FloatSerializer.class,
 				FloatSerializer.FloatSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> FloatSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> FloatSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-float-serializer-snapshot")
 			.withTestData("flink-1.6-float-serializer-data", 10);
 
@@ -196,7 +196,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				FloatValueSerializer.class,
 				FloatValueSerializer.FloatValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> FloatValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> FloatValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-float-value-serializer-snapshot")
 			.withTestData("flink-1.6-float-value-serializer-data", 10);
 
@@ -207,7 +207,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				IntSerializer.class,
 				IntSerializer.IntSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> IntSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> IntSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-int-serializer-snapshot")
 			.withTestData("flink-1.6-int-serializer-data", 10);
 
@@ -218,7 +218,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				IntValueSerializer.class,
 				IntValueSerializer.IntValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> IntValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> IntValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-int-value-serializer-snapshot")
 			.withTestData("flink-1.6-int-value-serializer-data", 10);
 
@@ -229,7 +229,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				LongSerializer.class,
 				LongSerializer.LongSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> LongSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> LongSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-long-serializer-snapshot")
 			.withTestData("flink-1.6-long-serializer-data", 10);
 
@@ -240,7 +240,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				LongValueSerializer.class,
 				LongValueSerializer.LongValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> LongValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> LongValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-long-value-serializer-snapshot")
 			.withTestData("flink-1.6-long-value-serializer-data", 10);
 
@@ -251,7 +251,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				NullValueSerializer.class,
 				NullValueSerializer.NullValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> NullValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> NullValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-null-value-serializer-snapshot")
 			.withTestData("flink-1.6-null-value-serializer-data", 10);
 
@@ -262,7 +262,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				ShortSerializer.class,
 				ShortSerializer.ShortSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> ShortSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> ShortSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-short-serializer-snapshot")
 			.withTestData("flink-1.6-short-serializer-data", 10);
 
@@ -273,7 +273,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				ShortValueSerializer.class,
 				ShortValueSerializer.ShortValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> ShortValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> ShortValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-short-value-serializer-snapshot")
 			.withTestData("flink-1.6-short-value-serializer-data", 10);
 
@@ -284,7 +284,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				SqlDateSerializer.class,
 				SqlDateSerializer.SqlDateSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> SqlDateSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> SqlDateSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-sql-date-serializer-snapshot")
 			.withTestData("flink-1.6-sql-date-serializer-data", 10);
 
@@ -295,7 +295,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				SqlTimeSerializer.class,
 				SqlTimeSerializer.SqlTimeSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> SqlTimeSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> SqlTimeSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-sql-time-serializer-snapshot")
 			.withTestData("flink-1.6-sql-time-serializer-data", 10);
 
@@ -306,7 +306,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				SqlTimestampSerializer.class,
 				SqlTimestampSerializer.SqlTimestampSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> SqlTimestampSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> SqlTimestampSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-sql-timestamp-serializer-snapshot")
 			.withTestData("flink-1.6-sql-timestamp-serializer-data", 10);
 
@@ -317,7 +317,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				StringSerializer.class,
 				StringSerializer.StringSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> StringSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> StringSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-string-serializer-snapshot")
 			.withTestData("flink-1.6-string-serializer-data", 10);
 
@@ -328,7 +328,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				StringValueSerializer.class,
 				StringValueSerializer.StringValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> StringValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> StringValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-string-value-serializer-snapshot")
 			.withTestData("flink-1.6-string-value-serializer-data", 10);
 
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
index a355e1a..524a801 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
@@ -38,7 +38,7 @@ public class ListSerializerSnapshotMigrationTest extends TypeSerializerSnapshotM
 					ListSerializer.class,
 					ListSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new ListSerializer<>(StringSerializer.INSTANCE))
+				.withNewSerializerProvider(() -> new ListSerializer<>(StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
 		);
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
index bb6dc95..be9f152 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
@@ -38,7 +38,7 @@ public class MapSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMi
 					MapSerializer.class,
 					MapSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
+				.withNewSerializerProvider(() -> new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
 		);
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
index 46d23a9..f6aa59f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
@@ -48,7 +48,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				BooleanPrimitiveArraySerializer.class,
 				BooleanPrimitiveArraySerializer.BooleanPrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> BooleanPrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> BooleanPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-boolean-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-boolean-primitive-array-serializer-data", 10);
 
@@ -59,7 +59,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				BytePrimitiveArraySerializer.class,
 				BytePrimitiveArraySerializer.BytePrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> BytePrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> BytePrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-byte-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-byte-primitive-array-serializer-data", 10);
 
@@ -70,7 +70,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				CharPrimitiveArraySerializer.class,
 				CharPrimitiveArraySerializer.CharPrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> CharPrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> CharPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-char-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-char-primitive-array-serializer-data", 10);
 
@@ -81,7 +81,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				DoublePrimitiveArraySerializer.class,
 				DoublePrimitiveArraySerializer.DoublePrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> DoublePrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> DoublePrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-double-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-double-primitive-array-serializer-data", 10);
 
@@ -92,7 +92,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				FloatPrimitiveArraySerializer.class,
 				FloatPrimitiveArraySerializer.FloatPrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> FloatPrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> FloatPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-float-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-float-primitive-array-serializer-data", 10);
 
@@ -103,7 +103,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				IntPrimitiveArraySerializer.class,
 				IntPrimitiveArraySerializer.IntPrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> IntPrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> IntPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-int-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-int-primitive-array-serializer-data", 10);
 
@@ -114,7 +114,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				LongPrimitiveArraySerializer.class,
 				LongPrimitiveArraySerializer.LongPrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> LongPrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> LongPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-long-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-long-primitive-array-serializer-data", 10);
 
@@ -125,7 +125,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				ShortPrimitiveArraySerializer.class,
 				ShortPrimitiveArraySerializer.ShortPrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> ShortPrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> ShortPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-short-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-short-primitive-array-serializer-data", 10);
 
@@ -136,7 +136,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				StringArraySerializer.class,
 				StringArraySerializer.StringArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> StringArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> StringArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-string-array-serializer-snapshot")
 			.withTestData("flink-1.6-string-array-serializer-data", 10);
 
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
index 1cc259b..019183e 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
@@ -52,7 +52,7 @@ public class AvroSerializerMigrationTest extends TypeSerializerSnapshotMigration
 				AvroSerializer.class,
 				AvroSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> new AvroSerializer(GenericRecord.class, Address.getClassSchema()))
+			.withNewSerializerProvider(() -> new AvroSerializer(GenericRecord.class, Address.getClassSchema()))
 			.withSnapshotDataLocation(GENERIC_SNAPSHOT)
 			.withTestData(DATA, 10);
 
@@ -61,7 +61,7 @@ public class AvroSerializerMigrationTest extends TypeSerializerSnapshotMigration
 				AvroSerializer.class,
 				AvroSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> new AvroSerializer<>(Address.class))
+			.withNewSerializerProvider(() -> new AvroSerializer<>(Address.class))
 			.withSnapshotDataLocation(SPECIFIC_SNAPSHOT)
 			.withTestData(DATA, 10);
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
index 431d708..cb911d6 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
@@ -37,7 +37,7 @@ public class LockableTypeSerializerSnapshotMigrationTest extends TypeSerializerS
 					Lockable.LockableTypeSerializer.class,
 					LockableTypeSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new Lockable.LockableTypeSerializer<>(StringSerializer.INSTANCE))
+				.withNewSerializerProvider(() -> new Lockable.LockableTypeSerializer<>(StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
 		);
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
index ab474b38..69cbe08 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
@@ -39,7 +39,7 @@ public class ListViewSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 					ListViewSerializer.class,
 					ListViewSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new ListViewSerializer<>(new ListSerializer<>(StringSerializer.INSTANCE)))
+				.withNewSerializerProvider(() -> new ListViewSerializer<>(new ListSerializer<>(StringSerializer.INSTANCE)))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
 		);
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
index d3106d3..26281da 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
@@ -40,7 +40,7 @@ public class MapViewSerializerSnapshotMigrationTest extends TypeSerializerSnapsh
 					MapViewSerializer.class,
 					MapViewSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new MapViewSerializer<>(
+				.withNewSerializerProvider(() -> new MapViewSerializer<>(
 					new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE)))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
index a66097d..ffbfd25 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
@@ -39,7 +39,7 @@ public class ArrayListSerializerMigrationTest extends TypeSerializerSnapshotMigr
 					ArrayListSerializer.class,
 					ArrayListSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new ArrayListSerializer<>(StringSerializer.INSTANCE))
+				.withNewSerializerProvider(() -> new ArrayListSerializer<>(StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
 		);
diff --git a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
index 9a84587..df836a3 100644
--- a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
+++ b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
@@ -40,7 +40,7 @@ public class ScalaEitherSerializerSnapshotMigrationTest extends TypeSerializerSn
 					EitherSerializer.class,
 					ScalaEitherSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new EitherSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
+				.withNewSerializerProvider(() -> new EitherSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
 		);