You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/17 11:53:54 UTC

[flink] 03/06: [FLINK-10778] [tests] Make TypeSerializerSnapshotMigrationTestBase aware of snapshot version

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e21c3d472e4cc136f1d709c99d66177234fb4d3e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Jan 14 13:38:36 2019 +0100

    [FLINK-10778] [tests] Make TypeSerializerSnapshotMigrationTestBase aware of snapshot version
    
    Before, the tests in TypeSerializerSnapshotMigrationTestBase always
    assumed that the test serializer snapshotw were written with Flink 1.6.
    
    This commit makes this flexible, so that we can allow subclasses to
    specify different snapshot versions for each TestSpecification.
---
 ...mpositeTypeSerializerSnapshotMigrationTest.java | 13 +++-
 .../TypeSerializerSnapshotMigrationTestBase.java   | 29 +++++++--
 .../BaseTypeSerializerSnapshotMigrationTest.java   | 76 +++++++++++++++-------
 .../base/ListSerializerSnapshotMigrationTest.java  |  7 +-
 .../base/MapSerializerSnapshotMigrationTest.java   |  7 +-
 ...mitiveArraySerializerSnapshotMigrationTest.java | 28 +++++---
 .../testutils/migration/MigrationVersion.java      |  4 ++
 .../typeutils/AvroSerializerMigrationTest.java     | 13 +++-
 ...ockableTypeSerializerSnapshotMigrationTest.java |  4 +-
 .../ListViewSerializerSnapshotMigrationTest.java   |  4 +-
 .../MapViewSerializerSnapshotMigrationTest.java    |  4 +-
 .../state/ArrayListSerializerMigrationTest.java    |  7 +-
 ...ScalaEitherSerializerSnapshotMigrationTest.java |  4 +-
 13 files changed, 150 insertions(+), 50 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
index 62135d7..8a451e1 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
 import org.apache.flink.api.java.typeutils.runtime.JavaEitherSerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.types.Either;
 
 import org.junit.runner.RunWith;
@@ -48,14 +49,22 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer
 
 		// Either<String, Integer>
 
-		final TestSpecification<Either<String, Integer>> either = TestSpecification.<Either<String, Integer>>builder("1.6-either", EitherSerializer.class, JavaEitherSerializerSnapshot.class)
+		final TestSpecification<Either<String, Integer>> either =TestSpecification.<Either<String, Integer>>builder(
+				"1.6-either",
+				EitherSerializer.class,
+				JavaEitherSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE))
 			.withSnapshotDataLocation("flink-1.6-either-type-serializer-snapshot")
 			.withTestData("flink-1.6-either-type-serializer-data", 10);
 
 		// GenericArray<String>
 
-		final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerSnapshot.class)
+		final TestSpecification<String[]> array = TestSpecification.<String[]>builder(
+				"1.6-generic-array",
+				GenericArraySerializer.class,
+				GenericArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
 			.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
 			.withTestData("flink-1.6-array-type-serializer-data", 10);
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
index ea18309..55554be 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -125,13 +126,17 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, newSnapshot, serializer);
 
 		DataInputView in = new DataInputDeserializer(out.wrapAsByteBuffer());
-		return TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(in, Thread.currentThread().getContextClassLoader(), null);
+		return readSnapshot(in);
 	}
 
 	private TypeSerializerSnapshot<ElementT> snapshotUnderTest() {
 		DataInputView input = contentsOf(testSpecification.getSnapshotDataLocation());
 		try {
-			return readPre17SnapshotFormat(input);
+			if (!testSpecification.getTestMigrationVersion().isNewerVersionThan(MigrationVersion.v1_6)) {
+				return readPre17SnapshotFormat(input);
+			} else {
+				return readSnapshot(input);
+			}
 		}
 		catch (IOException e) {
 			throw new RuntimeException("Unable to read " + testSpecification.getSnapshotDataLocation(),  e);
@@ -148,6 +153,11 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		return (TypeSerializerSnapshot<ElementT>) serializers.get(0).f1;
 	}
 
+	private TypeSerializerSnapshot<ElementT> readSnapshot(DataInputView in) throws IOException {
+		return TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+			in, Thread.currentThread().getContextClassLoader(), null);
+	}
+
 	private DataInputView dataUnderTest() {
 		return contentsOf(testSpecification.getTestDataLocation());
 	}
@@ -189,6 +199,7 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		private final Class<? extends TypeSerializer<T>> serializerType;
 		private final Class<? extends TypeSerializerSnapshot<T>> snapshotClass;
 		private final String name;
+		private final MigrationVersion testMigrationVersion;
 		private Supplier<? extends TypeSerializer<T>> serializerProvider;
 		private String snapshotDataLocation;
 		private String testDataLocation;
@@ -198,22 +209,26 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		public static <T> TestSpecification<T> builder(
 			String name,
 			Class<? extends TypeSerializer> serializerClass,
-			Class<? extends TypeSerializerSnapshot> snapshotClass) {
+			Class<? extends TypeSerializerSnapshot> snapshotClass,
+			MigrationVersion testMigrationVersion) {
 
 			return new TestSpecification<>(
 				name,
 				(Class<? extends TypeSerializer<T>>) serializerClass,
-				(Class<? extends TypeSerializerSnapshot<T>>) snapshotClass);
+				(Class<? extends TypeSerializerSnapshot<T>>) snapshotClass,
+				testMigrationVersion);
 		}
 
 		private TestSpecification(
 			String name,
 			Class<? extends TypeSerializer<T>> serializerType,
-			Class<? extends TypeSerializerSnapshot<T>> snapshotClass) {
+			Class<? extends TypeSerializerSnapshot<T>> snapshotClass,
+			MigrationVersion testMigrationVersion) {
 
 			this.name = name;
 			this.serializerType = serializerType;
 			this.snapshotClass = snapshotClass;
+			this.testMigrationVersion = testMigrationVersion;
 		}
 
 		public TestSpecification<T> withSerializerProvider(Supplier<? extends TypeSerializer<T>> serializerProvider) {
@@ -249,6 +264,10 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 			return resourcePath(this.snapshotDataLocation);
 		}
 
+		private MigrationVersion getTestMigrationVersion() {
+			return testMigrationVersion;
+		}
+
 		public Class<? extends TypeSerializerSnapshot<T>> getSnapshotClass() {
 			return snapshotClass;
 		}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
index 46abfc6..bd7ac6c 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.types.BooleanValue;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.CharValue;
@@ -61,7 +62,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<BigDecimal> bigDec = TestSpecification.<BigDecimal>builder(
 				"1.6-big-dec",
 				BigDecSerializer.class,
-				BigDecSerializer.BigDecSerializerSnapshot.class)
+				BigDecSerializer.BigDecSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> BigDecSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-big-dec-serializer-snapshot")
 			.withTestData("flink-1.6-big-dec-serializer-data", 10);
@@ -71,7 +73,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<BigInteger> bigInt = TestSpecification.<BigInteger>builder(
 				"1.6-big-int",
 				BigIntSerializer.class,
-				BigIntSerializer.BigIntSerializerSnapshot.class)
+				BigIntSerializer.BigIntSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> BigIntSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-big-int-serializer-snapshot")
 			.withTestData("flink-1.6-big-int-serializer-data", 10);
@@ -81,7 +84,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Boolean> booleanType = TestSpecification.<Boolean>builder(
 				"1.6-boolean",
 				BooleanSerializer.class,
-				BooleanSerializer.BooleanSerializerSnapshot.class)
+				BooleanSerializer.BooleanSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> BooleanSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-boolean-serializer-snapshot")
 			.withTestData("flink-1.6-boolean-serializer-data", 10);
@@ -91,7 +95,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<BooleanValue> booleanValue = TestSpecification.<BooleanValue>builder(
 				"1.6-boolean-value",
 				BooleanValueSerializer.class,
-				BooleanValueSerializer.BooleanValueSerializerSnapshot.class)
+				BooleanValueSerializer.BooleanValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> BooleanValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-boolean-value-serializer-snapshot")
 			.withTestData("flink-1.6-boolean-value-serializer-data", 10);
@@ -101,7 +106,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Byte> byteType = TestSpecification.<Byte>builder(
 				"1.6-byte",
 				ByteSerializer.class,
-				ByteSerializer.ByteSerializerSnapshot.class)
+				ByteSerializer.ByteSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> ByteSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-byte-serializer-snapshot")
 			.withTestData("flink-1.6-byte-serializer-data", 10);
@@ -111,7 +117,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<ByteValue> byteValue = TestSpecification.<ByteValue>builder(
 				"1.6-byte-value",
 				ByteValueSerializer.class,
-				ByteValueSerializer.ByteValueSerializerSnapshot.class)
+				ByteValueSerializer.ByteValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> ByteValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-byte-value-serializer-snapshot")
 			.withTestData("flink-1.6-byte-value-serializer-data", 10);
@@ -121,7 +128,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Character> charType = TestSpecification.<Character>builder(
 				"1.6-char",
 				CharSerializer.class,
-				CharSerializer.CharSerializerSnapshot.class)
+				CharSerializer.CharSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> CharSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-char-serializer-snapshot")
 			.withTestData("flink-1.6-char-serializer-data", 10);
@@ -131,7 +139,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<CharValue> charValue = TestSpecification.<CharValue>builder(
 				"1.6-char-value",
 				CharValueSerializer.class,
-				CharValueSerializer.CharValueSerializerSnapshot.class)
+				CharValueSerializer.CharValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> CharValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-char-value-serializer-snapshot")
 			.withTestData("flink-1.6-char-value-serializer-data", 10);
@@ -141,7 +150,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Date> javaDate = TestSpecification.<Date>builder(
 				"1.6-date",
 				DateSerializer.class,
-				DateSerializer.DateSerializerSnapshot.class)
+				DateSerializer.DateSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> DateSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-date-serializer-snapshot")
 			.withTestData("flink-1.6-date-serializer-data", 10);
@@ -151,7 +161,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Double> doubleType = TestSpecification.<Double>builder(
 				"1.6-double",
 				DoubleSerializer.class,
-				DoubleSerializer.DoubleSerializerSnapshot.class)
+				DoubleSerializer.DoubleSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> DoubleSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-double-serializer-snapshot")
 			.withTestData("flink-1.6-double-serializer-data", 10);
@@ -161,7 +172,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<DoubleValue> doubleValue = TestSpecification.<DoubleValue>builder(
 				"1.6-double-value",
 				DoubleValueSerializer.class,
-				DoubleValueSerializer.DoubleValueSerializerSnapshot.class)
+				DoubleValueSerializer.DoubleValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> DoubleValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-double-value-serializer-snapshot")
 			.withTestData("flink-1.6-double-value-serializer-data", 10);
@@ -171,7 +183,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Float> floatType = TestSpecification.<Float>builder(
 				"1.6-float",
 				FloatSerializer.class,
-				FloatSerializer.FloatSerializerSnapshot.class)
+				FloatSerializer.FloatSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> FloatSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-float-serializer-snapshot")
 			.withTestData("flink-1.6-float-serializer-data", 10);
@@ -181,7 +194,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<FloatValue> floatValue = TestSpecification.<FloatValue>builder(
 				"1.6-float-value",
 				FloatValueSerializer.class,
-				FloatValueSerializer.FloatValueSerializerSnapshot.class)
+				FloatValueSerializer.FloatValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> FloatValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-float-value-serializer-snapshot")
 			.withTestData("flink-1.6-float-value-serializer-data", 10);
@@ -191,7 +205,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Integer> intType = TestSpecification.<Integer>builder(
 				"1.6-int",
 				IntSerializer.class,
-				IntSerializer.IntSerializerSnapshot.class)
+				IntSerializer.IntSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> IntSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-int-serializer-snapshot")
 			.withTestData("flink-1.6-int-serializer-data", 10);
@@ -201,7 +216,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<IntValue> intValue = TestSpecification.<IntValue>builder(
 				"1.6-int-value",
 				IntValueSerializer.class,
-				IntValueSerializer.IntValueSerializerSnapshot.class)
+				IntValueSerializer.IntValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> IntValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-int-value-serializer-snapshot")
 			.withTestData("flink-1.6-int-value-serializer-data", 10);
@@ -211,7 +227,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Long> longType = TestSpecification.<Long>builder(
 				"1.6-long",
 				LongSerializer.class,
-				LongSerializer.LongSerializerSnapshot.class)
+				LongSerializer.LongSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> LongSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-long-serializer-snapshot")
 			.withTestData("flink-1.6-long-serializer-data", 10);
@@ -221,7 +238,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<LongValue> longValue = TestSpecification.<LongValue>builder(
 				"1.6-long-value",
 				LongValueSerializer.class,
-				LongValueSerializer.LongValueSerializerSnapshot.class)
+				LongValueSerializer.LongValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> LongValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-long-value-serializer-snapshot")
 			.withTestData("flink-1.6-long-value-serializer-data", 10);
@@ -231,7 +249,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<NullValue> nullValue = TestSpecification.<NullValue>builder(
 				"1.6-null-value",
 				NullValueSerializer.class,
-				NullValueSerializer.NullValueSerializerSnapshot.class)
+				NullValueSerializer.NullValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> NullValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-null-value-serializer-snapshot")
 			.withTestData("flink-1.6-null-value-serializer-data", 10);
@@ -241,7 +260,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Short> shortType = TestSpecification.<Short>builder(
 				"1.6-short",
 				ShortSerializer.class,
-				ShortSerializer.ShortSerializerSnapshot.class)
+				ShortSerializer.ShortSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> ShortSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-short-serializer-snapshot")
 			.withTestData("flink-1.6-short-serializer-data", 10);
@@ -251,7 +271,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<ShortValue> shortValue = TestSpecification.<ShortValue>builder(
 				"1.6-short-value",
 				ShortValueSerializer.class,
-				ShortValueSerializer.ShortValueSerializerSnapshot.class)
+				ShortValueSerializer.ShortValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> ShortValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-short-value-serializer-snapshot")
 			.withTestData("flink-1.6-short-value-serializer-data", 10);
@@ -261,7 +282,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<java.sql.Date> sqlDate = TestSpecification.<java.sql.Date>builder(
 				"1.6-sql-date",
 				SqlDateSerializer.class,
-				SqlDateSerializer.SqlDateSerializerSnapshot.class)
+				SqlDateSerializer.SqlDateSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> SqlDateSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-sql-date-serializer-snapshot")
 			.withTestData("flink-1.6-sql-date-serializer-data", 10);
@@ -271,7 +293,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Time> sqlTime = TestSpecification.<Time>builder(
 				"1.6-sql-time",
 				SqlTimeSerializer.class,
-				SqlTimeSerializer.SqlTimeSerializerSnapshot.class)
+				SqlTimeSerializer.SqlTimeSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> SqlTimeSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-sql-time-serializer-snapshot")
 			.withTestData("flink-1.6-sql-time-serializer-data", 10);
@@ -281,7 +304,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Timestamp> sqlTimestamp = TestSpecification.<Timestamp>builder(
 				"1.6-sql-timestamp",
 				SqlTimestampSerializer.class,
-				SqlTimestampSerializer.SqlTimestampSerializerSnapshot.class)
+				SqlTimestampSerializer.SqlTimestampSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> SqlTimestampSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-sql-timestamp-serializer-snapshot")
 			.withTestData("flink-1.6-sql-timestamp-serializer-data", 10);
@@ -291,7 +315,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<String> stringType = TestSpecification.<String>builder(
 				"1.6-string",
 				StringSerializer.class,
-				StringSerializer.StringSerializerSnapshot.class)
+				StringSerializer.StringSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> StringSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-string-serializer-snapshot")
 			.withTestData("flink-1.6-string-serializer-data", 10);
@@ -301,7 +326,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<StringValue> stringValue = TestSpecification.<StringValue>builder(
 				"1.6-string-value",
 				StringValueSerializer.class,
-				StringValueSerializer.StringValueSerializerSnapshot.class)
+				StringValueSerializer.StringValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> StringValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-string-value-serializer-snapshot")
 			.withTestData("flink-1.6-string-value-serializer-data", 10);
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
index 02643fe..a355e1a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import java.util.List;
 
@@ -32,7 +33,11 @@ public class ListSerializerSnapshotMigrationTest extends TypeSerializerSnapshotM
 
 	public ListSerializerSnapshotMigrationTest() {
 		super(
-			TestSpecification.<List<String>>builder("1.6-list-serializer", ListSerializer.class, ListSerializerSnapshot.class)
+			TestSpecification.<List<String>>builder(
+					"1.6-list-serializer",
+					ListSerializer.class,
+					ListSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new ListSerializer<>(StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
index ea66128..bb6dc95 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import java.util.Map;
 
@@ -32,7 +33,11 @@ public class MapSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMi
 
 	public MapSerializerSnapshotMigrationTest() {
 		super(
-			TestSpecification.<Map<Integer, String>>builder("1.6-map-serializer", MapSerializer.class, MapSerializerSnapshot.class)
+			TestSpecification.<Map<Integer, String>>builder(
+					"1.6-map-serializer",
+					MapSerializer.class,
+					MapSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
index 41f61fb..46d23a9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -45,7 +46,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<boolean[]> booleanArray = TestSpecification.<boolean[]>builder(
 				"1.6-boolean-primitive-array",
 				BooleanPrimitiveArraySerializer.class,
-				BooleanPrimitiveArraySerializer.BooleanPrimitiveArraySerializerSnapshot.class)
+				BooleanPrimitiveArraySerializer.BooleanPrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> BooleanPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-boolean-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-boolean-primitive-array-serializer-data", 10);
@@ -55,7 +57,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<byte[]> byteArray = TestSpecification.<byte[]>builder(
 				"1.6-byte-primitive-array",
 				BytePrimitiveArraySerializer.class,
-				BytePrimitiveArraySerializer.BytePrimitiveArraySerializerSnapshot.class)
+				BytePrimitiveArraySerializer.BytePrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> BytePrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-byte-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-byte-primitive-array-serializer-data", 10);
@@ -65,7 +68,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<char[]> charArray = TestSpecification.<char[]>builder(
 				"1.6-char-primitive-array",
 				CharPrimitiveArraySerializer.class,
-				CharPrimitiveArraySerializer.CharPrimitiveArraySerializerSnapshot.class)
+				CharPrimitiveArraySerializer.CharPrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> CharPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-char-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-char-primitive-array-serializer-data", 10);
@@ -75,7 +79,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<double[]> doubleArray = TestSpecification.<double[]>builder(
 				"1.6-double-primitive-array",
 				DoublePrimitiveArraySerializer.class,
-				DoublePrimitiveArraySerializer.DoublePrimitiveArraySerializerSnapshot.class)
+				DoublePrimitiveArraySerializer.DoublePrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> DoublePrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-double-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-double-primitive-array-serializer-data", 10);
@@ -85,7 +90,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<float[]> floatArray = TestSpecification.<float[]>builder(
 				"1.6-float-primitive-array",
 				FloatPrimitiveArraySerializer.class,
-				FloatPrimitiveArraySerializer.FloatPrimitiveArraySerializerSnapshot.class)
+				FloatPrimitiveArraySerializer.FloatPrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> FloatPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-float-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-float-primitive-array-serializer-data", 10);
@@ -95,7 +101,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<int[]> intArray = TestSpecification.<int[]>builder(
 				"1.6-int-primitive-array",
 				IntPrimitiveArraySerializer.class,
-				IntPrimitiveArraySerializer.IntPrimitiveArraySerializerSnapshot.class)
+				IntPrimitiveArraySerializer.IntPrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> IntPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-int-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-int-primitive-array-serializer-data", 10);
@@ -105,7 +112,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<long[]> longArray = TestSpecification.<long[]>builder(
 				"1.6-long-primitive-array",
 				LongPrimitiveArraySerializer.class,
-				LongPrimitiveArraySerializer.LongPrimitiveArraySerializerSnapshot.class)
+				LongPrimitiveArraySerializer.LongPrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> LongPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-long-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-long-primitive-array-serializer-data", 10);
@@ -115,7 +123,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<short[]> shortArray = TestSpecification.<short[]>builder(
 				"1.6-short-primitive-array",
 				ShortPrimitiveArraySerializer.class,
-				ShortPrimitiveArraySerializer.ShortPrimitiveArraySerializerSnapshot.class)
+				ShortPrimitiveArraySerializer.ShortPrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> ShortPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-short-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-short-primitive-array-serializer-data", 10);
@@ -125,7 +134,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<String[]> stringArray = TestSpecification.<String[]>builder(
 				"1.6-string-array",
 				StringArraySerializer.class,
-				StringArraySerializer.StringArraySerializerSnapshot.class)
+				StringArraySerializer.StringArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> StringArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-string-array-serializer-snapshot")
 			.withTestData("flink-1.6-string-array-serializer-data", 10);
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
index 87f6665..b6586d8 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
@@ -43,4 +43,8 @@ public enum MigrationVersion {
 	public String toString() {
 		return versionStr;
 	}
+
+	public boolean isNewerVersionThan(MigrationVersion otherVersion) {
+		return Double.valueOf(versionStr) > Double.valueOf(otherVersion.versionStr);
+	}
 }
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
index a5f2a8e..1cc259b 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTest
 import org.apache.flink.formats.avro.generated.Address;
 
 import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -46,12 +47,20 @@ public class AvroSerializerMigrationTest extends TypeSerializerSnapshotMigration
 	@Parameterized.Parameters(name = "Test Specification = {0}")
 	public static Collection<Object[]> testSpecifications() {
 
-		final TestSpecification<Address> genericCase = TestSpecification.<Address>builder("1.6-generic", AvroSerializer.class, AvroSerializerSnapshot.class)
+		final TestSpecification<Address> genericCase = TestSpecification.<Address>builder(
+				"1.6-generic",
+				AvroSerializer.class,
+				AvroSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> new AvroSerializer(GenericRecord.class, Address.getClassSchema()))
 			.withSnapshotDataLocation(GENERIC_SNAPSHOT)
 			.withTestData(DATA, 10);
 
-		final TestSpecification<Address> specificCase = TestSpecification.<Address>builder("1.6-specific", AvroSerializer.class, AvroSerializerSnapshot.class)
+		final TestSpecification<Address> specificCase = TestSpecification.<Address>builder(
+				"1.6-specific",
+				AvroSerializer.class,
+				AvroSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> new AvroSerializer<>(Address.class))
 			.withSnapshotDataLocation(SPECIFIC_SNAPSHOT)
 			.withTestData(DATA, 10);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
index bb3b7f2..431d708 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa.sharedbuffer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 /**
  * Migration test for the {@link LockableTypeSerializerSnapshot}.
@@ -34,7 +35,8 @@ public class LockableTypeSerializerSnapshotMigrationTest extends TypeSerializerS
 			TestSpecification.<Lockable<String>>builder(
 					"1.6-lockable-type-serializer",
 					Lockable.LockableTypeSerializer.class,
-					LockableTypeSerializerSnapshot.class)
+					LockableTypeSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new Lockable.LockableTypeSerializer<>(StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
index 5465ada..ab474b38 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTest
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 /**
  * Migration test for the {@link ListViewSerializerSnapshot}.
@@ -36,7 +37,8 @@ public class ListViewSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 			TestSpecification.<ListView<String>>builder(
 					"1.6-list-view-serializer",
 					ListViewSerializer.class,
-					ListViewSerializerSnapshot.class)
+					ListViewSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new ListViewSerializer<>(new ListSerializer<>(StringSerializer.INSTANCE)))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
index 66c7f17..d3106d3 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.table.api.dataview.MapView;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 /**
  * Migration test for the {@link MapViewSerializerSnapshot}.
@@ -37,7 +38,8 @@ public class MapViewSerializerSnapshotMigrationTest extends TypeSerializerSnapsh
 			TestSpecification.<MapView<Integer, String>>builder(
 					"1.6-map-view-serializer",
 					MapViewSerializer.class,
-					MapViewSerializerSnapshot.class)
+					MapViewSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new MapViewSerializer<>(
 					new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE)))
 				.withSnapshotDataLocation(SNAPSHOT)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
index b355795..a66097d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import java.util.ArrayList;
 
@@ -33,7 +34,11 @@ public class ArrayListSerializerMigrationTest extends TypeSerializerSnapshotMigr
 
 	public ArrayListSerializerMigrationTest() {
 		super(
-			TestSpecification.<ArrayList<String>>builder("1.6-arraylist-serializer", ArrayListSerializer.class, ArrayListSerializerSnapshot.class)
+			TestSpecification.<ArrayList<String>>builder(
+					"1.6-arraylist-serializer",
+					ArrayListSerializer.class,
+					ArrayListSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new ArrayListSerializer<>(StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
diff --git a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
index 9cd8b5d..9a84587 100644
--- a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
+++ b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTest
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 
+import org.apache.flink.testutils.migration.MigrationVersion;
 import scala.util.Either;
 
 /**
@@ -37,7 +38,8 @@ public class ScalaEitherSerializerSnapshotMigrationTest extends TypeSerializerSn
 			TestSpecification.<Either<Integer, String>>builder(
 					"1.6-scala-either-serializer",
 					EitherSerializer.class,
-					ScalaEitherSerializerSnapshot.class)
+					ScalaEitherSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new EitherSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)