You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/17 11:53:51 UTC

[flink] branch master updated (322e479 -> 76dd766)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 322e479  [FLINK-11069][core] Merge FutureUtil into FutureUtils
     new b30adb0  [hotfix] [tests] Remove unfruitful MigrationTestUtil class
     new fa3432a  [FLINK-10778] [tests] Move MigrationVersion to flink-core test utils
     new e21c3d4  [FLINK-10778] [tests] Make TypeSerializerSnapshotMigrationTestBase aware of snapshot version
     new c6124db  [FLINK-10778] [tests] Make new serializer compatibility tests more flexible in TypeSerializerSnapshotMigrationTestBase
     new 8a49d73  [FLINK-10778] [tests] Remove serializerSnapshotRestoresCurrentSerializer test in TypeSerializerSnapshotMigrationTestBase
     new 76dd766  [FLINK-10778] [tests] Extend all TypeSerializerSnapshotMigrationTestBase subclasses to test restoring from 1.7

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink-connector-filesystem/pom.xml             |   8 +
 .../fs/bucketing/BucketingSinkMigrationTest.java   |   9 +-
 .../flink-connector-kafka-base/pom.xml             |   8 +
 .../kafka/FlinkKafkaConsumerBaseMigrationTest.java |  27 +-
 .../kafka/FlinkKafkaConsumerBaseTest.java          |   2 +-
 flink-connectors/flink-connector-kinesis/pom.xml   |   8 +
 .../kinesis/FlinkKinesisConsumerMigrationTest.java |  21 +-
 .../kinesis/FlinkKinesisConsumerTest.java          |   2 +-
 ...mpositeTypeSerializerSnapshotMigrationTest.java |  39 ++-
 .../TypeSerializerSnapshotMigrationTestBase.java   | 194 ++++++++++--
 .../BaseTypeSerializerSnapshotMigrationTest.java   | 328 ++-------------------
 .../base/ListSerializerSnapshotMigrationTest.java  |  32 +-
 .../base/MapSerializerSnapshotMigrationTest.java   |  33 ++-
 ...mitiveArraySerializerSnapshotMigrationTest.java | 156 ++++------
 .../testutils}/migration/MigrationVersion.java     |   6 +-
 ...lizer-data => flink-1.6-either-serializer-data} | Bin
 ...apshot => flink-1.6-either-serializer-snapshot} | Bin
 ...ata => flink-1.6-generic-array-serializer-data} | Bin
 ...=> flink-1.6-generic-array-serializer-snapshot} | Bin
 ...izer-data => flink-1.7-big-dec-serializer-data} | Bin
 .../flink-1.7-big-dec-serializer-snapshot          | Bin 0 -> 155 bytes
 .../resources/flink-1.7-big-int-serializer-data    | Bin 0 -> 150 bytes
 .../flink-1.7-big-int-serializer-snapshot          | Bin 0 -> 155 bytes
 ...ink-1.7-boolean-primitive-array-serializer-data | Bin 0 -> 90 bytes
 ...1.7-boolean-primitive-array-serializer-snapshot | Bin 0 -> 212 bytes
 ...izer-data => flink-1.7-boolean-serializer-data} |   0
 .../flink-1.7-boolean-serializer-snapshot          | Bin 0 -> 158 bytes
 ...ata => flink-1.7-boolean-value-serializer-data} | Bin
 .../flink-1.7-boolean-value-serializer-snapshot    | Bin 0 -> 173 bytes
 .../flink-1.7-byte-primitive-array-serializer-data | Bin 0 -> 100 bytes
 ...nk-1.7-byte-primitive-array-serializer-snapshot | Bin 0 -> 203 bytes
 .../test/resources/flink-1.7-byte-serializer-data  |   1 +
 .../resources/flink-1.7-byte-serializer-snapshot   | Bin 0 -> 149 bytes
 .../resources/flink-1.7-byte-value-serializer-data |   1 +
 .../flink-1.7-byte-value-serializer-snapshot       | Bin 0 -> 164 bytes
 .../flink-1.7-char-primitive-array-serializer-data | Bin 0 -> 240 bytes
 ...nk-1.7-char-primitive-array-serializer-snapshot | Bin 0 -> 203 bytes
 ...ializer-data => flink-1.7-char-serializer-data} | Bin
 .../resources/flink-1.7-char-serializer-snapshot   | Bin 0 -> 149 bytes
 ...r-data => flink-1.7-char-value-serializer-data} | Bin
 .../flink-1.7-char-value-serializer-snapshot       | Bin 0 -> 164 bytes
 .../test/resources/flink-1.7-date-serializer-data  | Bin 0 -> 80 bytes
 .../resources/flink-1.7-date-serializer-snapshot   | Bin 0 -> 149 bytes
 ...link-1.7-double-primitive-array-serializer-data | Bin 0 -> 360 bytes
 ...-1.7-double-primitive-array-serializer-snapshot | Bin 0 -> 209 bytes
 ...lizer-data => flink-1.7-double-serializer-data} |   0
 .../resources/flink-1.7-double-serializer-snapshot | Bin 0 -> 155 bytes
 ...data => flink-1.7-double-value-serializer-data} |   0
 .../flink-1.7-double-value-serializer-snapshot     | Bin 0 -> 170 bytes
 .../resources/flink-1.7-either-serializer-data     | Bin
 .../resources/flink-1.7-either-serializer-snapshot | Bin 0 -> 383 bytes
 ...flink-1.7-float-primitive-array-serializer-data | Bin 0 -> 160 bytes
 ...k-1.7-float-primitive-array-serializer-snapshot | Bin 0 -> 206 bytes
 ...alizer-data => flink-1.7-float-serializer-data} |   0
 .../resources/flink-1.7-float-serializer-snapshot  | Bin 0 -> 152 bytes
 ...-data => flink-1.7-float-value-serializer-data} |   0
 .../flink-1.7-float-value-serializer-snapshot      | Bin 0 -> 167 bytes
 .../flink-1.7-generic-array-serializer-data        | Bin 0 -> 280 bytes
 .../flink-1.7-generic-array-serializer-snapshot    | Bin 0 -> 270 bytes
 .../flink-1.7-int-primitive-array-serializer-data  | Bin 0 -> 160 bytes
 ...ink-1.7-int-primitive-array-serializer-snapshot | Bin 0 -> 200 bytes
 ...rializer-data => flink-1.7-int-serializer-data} | Bin
 .../resources/flink-1.7-int-serializer-snapshot    | Bin 0 -> 146 bytes
 ...er-data => flink-1.7-int-value-serializer-data} | Bin
 .../flink-1.7-int-value-serializer-snapshot        | Bin 0 -> 161 bytes
 .../test/resources/flink-1.7-list-serializer-data  | Bin
 .../resources/flink-1.7-list-serializer-snapshot   | Bin 0 -> 238 bytes
 .../flink-1.7-long-primitive-array-serializer-data | Bin 0 -> 280 bytes
 ...nk-1.7-long-primitive-array-serializer-snapshot | Bin 0 -> 203 bytes
 ...ializer-data => flink-1.7-long-serializer-data} | Bin
 .../resources/flink-1.7-long-serializer-snapshot   | Bin 0 -> 149 bytes
 ...r-data => flink-1.7-long-value-serializer-data} | Bin
 .../flink-1.7-long-value-serializer-snapshot       | Bin 0 -> 164 bytes
 .../test/resources/flink-1.7-map-serializer-data   | Bin 0 -> 390 bytes
 .../resources/flink-1.7-map-serializer-snapshot    | Bin 0 -> 379 bytes
 .../resources/flink-1.7-null-value-serializer-data |   0
 .../flink-1.7-null-value-serializer-snapshot       | Bin 0 -> 164 bytes
 ...flink-1.7-short-primitive-array-serializer-data | Bin 0 -> 120 bytes
 ...k-1.7-short-primitive-array-serializer-snapshot | Bin 0 -> 206 bytes
 ...alizer-data => flink-1.7-short-serializer-data} | Bin
 .../resources/flink-1.7-short-serializer-snapshot  | Bin 0 -> 152 bytes
 ...-data => flink-1.7-short-value-serializer-data} | Bin
 .../flink-1.7-short-value-serializer-snapshot      | Bin 0 -> 167 bytes
 .../resources/flink-1.7-sql-date-serializer-data   | Bin 0 -> 80 bytes
 .../flink-1.7-sql-date-serializer-snapshot         | Bin 0 -> 158 bytes
 .../resources/flink-1.7-sql-time-serializer-data   | Bin 0 -> 80 bytes
 .../flink-1.7-sql-time-serializer-snapshot         | Bin 0 -> 158 bytes
 .../flink-1.7-sql-timestamp-serializer-data        | Bin 0 -> 120 bytes
 .../flink-1.7-sql-timestamp-serializer-snapshot    | Bin 0 -> 173 bytes
 .../flink-1.7-string-array-serializer-data         | Bin
 .../flink-1.7-string-array-serializer-snapshot     | Bin 0 -> 182 bytes
 ...lizer-data => flink-1.7-string-serializer-data} |   0
 .../resources/flink-1.7-string-serializer-snapshot | Bin 0 -> 155 bytes
 .../flink-1.7-string-value-serializer-data         |   1 +
 .../flink-1.7-string-value-serializer-snapshot     | Bin 0 -> 170 bytes
 .../typeutils/AvroSerializerMigrationTest.java     |  44 +--
 ...7-avro-generic-type-serializer-address-snapshot | Bin 0 -> 370 bytes
 ...=> flink-1.7-avro-type-serializer-address-data} | Bin
 ...flink-1.7-avro-type-serializer-address-snapshot | Bin 0 -> 380 bytes
 flink-fs-tests/pom.xml                             |   8 +
 .../ContinuousFileProcessingMigrationTest.java     |  15 +-
 ...ockableTypeSerializerSnapshotMigrationTest.java |  39 ++-
 .../flink/cep/operator/CEPMigrationTest.java       |  30 +-
 ...ata => flink-1.7-lockable-type-serializer-data} | Bin
 .../flink-1.7-lockable-type-serializer-snapshot    | Bin 0 -> 241 bytes
 .../ListViewSerializerSnapshotMigrationTest.java   |  39 ++-
 .../MapViewSerializerSnapshotMigrationTest.java    |  40 ++-
 .../resources/flink-1.7-list-view-serializer-data  | Bin
 .../flink-1.7-list-view-serializer-snapshot        | Bin 0 -> 314 bytes
 .../resources/flink-1.7-map-view-serializer-data   | Bin 0 -> 390 bytes
 .../flink-1.7-map-view-serializer-snapshot         | Bin 0 -> 454 bytes
 .../state/ArrayListSerializerMigrationTest.java    |  32 +-
 ...er-data => flink-1.7-arraylist-serializer-data} | Bin
 .../flink-1.7-arraylist-serializer-snapshot        | Bin 0 -> 231 bytes
 ...ScalaEitherSerializerSnapshotMigrationTest.java |  38 ++-
 ...data => flink-1.7-scala-either-serializer-data} | Bin
 .../flink-1.7-scala-either-serializer-snapshot     | Bin 0 -> 381 bytes
 .../windowing/WindowOperatorMigrationTest.java     |  45 +--
 .../util/AbstractStreamOperatorTestHarness.java    |  10 +-
 .../util/migration/MigrationTestUtil.java          |  44 ---
 .../LegacyStatefulJobSavepointMigrationITCase.java |   2 +-
 .../utils/StatefulJobSavepointMigrationITCase.java |   2 +-
 .../StatefulJobWBroadcastStateMigrationITCase.java |   2 +-
 .../TypeSerializerSnapshotMigrationITCase.java     |   2 +-
 .../AbstractKeyedOperatorRestoreTestBase.java      |   2 +-
 .../restore/keyed/KeyedComplexChainTest.java       |   2 +-
 .../AbstractNonKeyedOperatorRestoreTestBase.java   |   2 +-
 .../operator/restore/unkeyed/ChainBreakTest.java   |   2 +-
 .../restore/unkeyed/ChainLengthDecreaseTest.java   |   2 +-
 .../restore/unkeyed/ChainLengthIncreaseTest.java   |   2 +-
 .../unkeyed/ChainLengthStatelessDecreaseTest.java  |   2 +-
 .../operator/restore/unkeyed/ChainOrderTest.java   |   2 +-
 .../operator/restore/unkeyed/ChainUnionTest.java   |   2 +-
 .../StatefulJobSavepointMigrationITCase.scala      |   4 +-
 ...StatefulJobWBroadcastStateMigrationITCase.scala |   2 +-
 135 files changed, 596 insertions(+), 696 deletions(-)
 rename {flink-streaming-java/src/test/java/org/apache/flink/streaming/util => flink-core/src/test/java/org/apache/flink/testutils}/migration/MigrationVersion.java (86%)
 rename flink-core/src/test/resources/{flink-1.6-either-type-serializer-data => flink-1.6-either-serializer-data} (100%)
 rename flink-core/src/test/resources/{flink-1.6-either-type-serializer-snapshot => flink-1.6-either-serializer-snapshot} (100%)
 rename flink-core/src/test/resources/{flink-1.6-array-type-serializer-data => flink-1.6-generic-array-serializer-data} (100%)
 rename flink-core/src/test/resources/{flink-1.6-array-type-serializer-snapshot => flink-1.6-generic-array-serializer-snapshot} (100%)
 copy flink-core/src/test/resources/{flink-1.6-big-dec-serializer-data => flink-1.7-big-dec-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-big-dec-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-big-int-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-big-int-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-boolean-primitive-array-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-boolean-primitive-array-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-boolean-serializer-data => flink-1.7-boolean-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-boolean-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-void-serializer-data => flink-1.7-boolean-value-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-boolean-value-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-byte-primitive-array-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-byte-primitive-array-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-byte-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-byte-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-byte-value-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-byte-value-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-char-primitive-array-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-char-primitive-array-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-char-value-serializer-data => flink-1.7-char-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-char-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-char-value-serializer-data => flink-1.7-char-value-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-char-value-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-date-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-date-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-double-primitive-array-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-double-primitive-array-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-double-value-serializer-data => flink-1.7-double-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-double-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-double-value-serializer-data => flink-1.7-double-value-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-double-value-serializer-snapshot
 copy flink-scala/src/test/resources/flink-1.6-scala-either-serializer-data => flink-core/src/test/resources/flink-1.7-either-serializer-data (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-either-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-float-primitive-array-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-float-primitive-array-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-float-value-serializer-data => flink-1.7-float-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-float-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-float-value-serializer-data => flink-1.7-float-value-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-float-value-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-generic-array-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-generic-array-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-int-primitive-array-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-int-primitive-array-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-int-value-serializer-data => flink-1.7-int-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-int-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-int-value-serializer-data => flink-1.7-int-value-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-int-value-serializer-snapshot
 copy flink-runtime/src/test/resources/flink-1.6-arraylist-serializer-data => flink-core/src/test/resources/flink-1.7-list-serializer-data (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-list-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-long-primitive-array-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-long-primitive-array-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-long-value-serializer-data => flink-1.7-long-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-long-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-long-value-serializer-data => flink-1.7-long-value-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-long-value-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-map-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-map-serializer-snapshot
 copy flink-yarn/src/test/resources/krb5.keytab => flink-core/src/test/resources/flink-1.7-null-value-serializer-data (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-null-value-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-short-primitive-array-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-short-primitive-array-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-short-value-serializer-data => flink-1.7-short-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-short-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-short-value-serializer-data => flink-1.7-short-value-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-short-value-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-sql-date-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-sql-date-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-sql-time-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-sql-time-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-sql-timestamp-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-sql-timestamp-serializer-snapshot
 copy flink-runtime/src/test/resources/flink-1.6-arraylist-serializer-data => flink-core/src/test/resources/flink-1.7-string-array-serializer-data (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-string-array-serializer-snapshot
 copy flink-core/src/test/resources/{flink-1.6-string-serializer-data => flink-1.7-string-serializer-data} (100%)
 create mode 100644 flink-core/src/test/resources/flink-1.7-string-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-string-value-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-string-value-serializer-snapshot
 create mode 100644 flink-formats/flink-avro/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot
 copy flink-formats/flink-avro/src/test/resources/{flink-1.6-avro-type-serializer-address-data => flink-1.7-avro-type-serializer-address-data} (100%)
 create mode 100644 flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot
 copy flink-libraries/flink-cep/src/test/resources/{flink-1.6-lockable-type-serializer-data => flink-1.7-lockable-type-serializer-data} (100%)
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.7-lockable-type-serializer-snapshot
 copy flink-runtime/src/test/resources/flink-1.6-arraylist-serializer-data => flink-libraries/flink-table/src/test/resources/flink-1.7-list-view-serializer-data (100%)
 create mode 100644 flink-libraries/flink-table/src/test/resources/flink-1.7-list-view-serializer-snapshot
 create mode 100644 flink-libraries/flink-table/src/test/resources/flink-1.7-map-view-serializer-data
 create mode 100644 flink-libraries/flink-table/src/test/resources/flink-1.7-map-view-serializer-snapshot
 copy flink-runtime/src/test/resources/{flink-1.6-arraylist-serializer-data => flink-1.7-arraylist-serializer-data} (100%)
 create mode 100644 flink-runtime/src/test/resources/flink-1.7-arraylist-serializer-snapshot
 copy flink-scala/src/test/resources/{flink-1.6-scala-either-serializer-data => flink-1.7-scala-either-serializer-data} (100%)
 create mode 100644 flink-scala/src/test/resources/flink-1.7-scala-either-serializer-snapshot
 delete mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java


[flink] 06/06: [FLINK-10778] [tests] Extend all TypeSerializerSnapshotMigrationTestBase subclasses to test restoring from 1.7

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 76dd766aab76befa95937476e5fa80a29099e86a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 15 18:26:16 2019 +0100

    [FLINK-10778] [tests] Extend all TypeSerializerSnapshotMigrationTestBase subclasses to test restoring from 1.7
    
    This closes #7504.
---
 ...mpositeTypeSerializerSnapshotMigrationTest.java |  46 +--
 .../TypeSerializerSnapshotMigrationTestBase.java   | 109 +++++++
 .../BaseTypeSerializerSnapshotMigrationTest.java   | 352 ++-------------------
 .../base/ListSerializerSnapshotMigrationTest.java  |  37 ++-
 .../base/MapSerializerSnapshotMigrationTest.java   |  38 ++-
 ...mitiveArraySerializerSnapshotMigrationTest.java | 166 +++-------
 ...lizer-data => flink-1.6-either-serializer-data} | Bin
 ...apshot => flink-1.6-either-serializer-snapshot} | Bin
 ...ata => flink-1.6-generic-array-serializer-data} | Bin
 ...=> flink-1.6-generic-array-serializer-snapshot} | Bin
 .../resources/flink-1.7-big-dec-serializer-data    | Bin 0 -> 300 bytes
 .../flink-1.7-big-dec-serializer-snapshot          | Bin 0 -> 155 bytes
 .../resources/flink-1.7-big-int-serializer-data    | Bin 0 -> 150 bytes
 .../flink-1.7-big-int-serializer-snapshot          | Bin 0 -> 155 bytes
 ...ink-1.7-boolean-primitive-array-serializer-data | Bin 0 -> 90 bytes
 ...1.7-boolean-primitive-array-serializer-snapshot | Bin 0 -> 212 bytes
 .../resources/flink-1.7-boolean-serializer-data    |   1 +
 .../flink-1.7-boolean-serializer-snapshot          | Bin 0 -> 158 bytes
 .../flink-1.7-boolean-value-serializer-data        | Bin 0 -> 10 bytes
 .../flink-1.7-boolean-value-serializer-snapshot    | Bin 0 -> 173 bytes
 .../flink-1.7-byte-primitive-array-serializer-data | Bin 0 -> 100 bytes
 ...nk-1.7-byte-primitive-array-serializer-snapshot | Bin 0 -> 203 bytes
 .../test/resources/flink-1.7-byte-serializer-data  |   1 +
 .../resources/flink-1.7-byte-serializer-snapshot   | Bin 0 -> 149 bytes
 .../resources/flink-1.7-byte-value-serializer-data |   1 +
 .../flink-1.7-byte-value-serializer-snapshot       | Bin 0 -> 164 bytes
 .../flink-1.7-char-primitive-array-serializer-data | Bin 0 -> 240 bytes
 ...nk-1.7-char-primitive-array-serializer-snapshot | Bin 0 -> 203 bytes
 .../test/resources/flink-1.7-char-serializer-data  | Bin 0 -> 20 bytes
 .../resources/flink-1.7-char-serializer-snapshot   | Bin 0 -> 149 bytes
 .../resources/flink-1.7-char-value-serializer-data | Bin 0 -> 20 bytes
 .../flink-1.7-char-value-serializer-snapshot       | Bin 0 -> 164 bytes
 .../test/resources/flink-1.7-date-serializer-data  | Bin 0 -> 80 bytes
 .../resources/flink-1.7-date-serializer-snapshot   | Bin 0 -> 149 bytes
 ...link-1.7-double-primitive-array-serializer-data | Bin 0 -> 360 bytes
 ...-1.7-double-primitive-array-serializer-snapshot | Bin 0 -> 209 bytes
 .../resources/flink-1.7-double-serializer-data     |   1 +
 .../resources/flink-1.7-double-serializer-snapshot | Bin 0 -> 155 bytes
 .../flink-1.7-double-value-serializer-data         |   1 +
 .../flink-1.7-double-value-serializer-snapshot     | Bin 0 -> 170 bytes
 .../resources/flink-1.7-either-serializer-data     | Bin 0 -> 130 bytes
 .../resources/flink-1.7-either-serializer-snapshot | Bin 0 -> 383 bytes
 ...flink-1.7-float-primitive-array-serializer-data | Bin 0 -> 160 bytes
 ...k-1.7-float-primitive-array-serializer-snapshot | Bin 0 -> 206 bytes
 .../test/resources/flink-1.7-float-serializer-data |   1 +
 .../resources/flink-1.7-float-serializer-snapshot  | Bin 0 -> 152 bytes
 .../flink-1.7-float-value-serializer-data          |   1 +
 .../flink-1.7-float-value-serializer-snapshot      | Bin 0 -> 167 bytes
 .../flink-1.7-generic-array-serializer-data        | Bin 0 -> 280 bytes
 .../flink-1.7-generic-array-serializer-snapshot    | Bin 0 -> 270 bytes
 .../flink-1.7-int-primitive-array-serializer-data  | Bin 0 -> 160 bytes
 ...ink-1.7-int-primitive-array-serializer-snapshot | Bin 0 -> 200 bytes
 .../test/resources/flink-1.7-int-serializer-data   | Bin 0 -> 40 bytes
 .../resources/flink-1.7-int-serializer-snapshot    | Bin 0 -> 146 bytes
 .../resources/flink-1.7-int-value-serializer-data  | Bin 0 -> 40 bytes
 .../flink-1.7-int-value-serializer-snapshot        | Bin 0 -> 161 bytes
 .../test/resources/flink-1.7-list-serializer-data  | Bin 0 -> 240 bytes
 .../resources/flink-1.7-list-serializer-snapshot   | Bin 0 -> 238 bytes
 .../flink-1.7-long-primitive-array-serializer-data | Bin 0 -> 280 bytes
 ...nk-1.7-long-primitive-array-serializer-snapshot | Bin 0 -> 203 bytes
 .../test/resources/flink-1.7-long-serializer-data  | Bin 0 -> 80 bytes
 .../resources/flink-1.7-long-serializer-snapshot   | Bin 0 -> 149 bytes
 .../resources/flink-1.7-long-value-serializer-data | Bin 0 -> 80 bytes
 .../flink-1.7-long-value-serializer-snapshot       | Bin 0 -> 164 bytes
 .../test/resources/flink-1.7-map-serializer-data   | Bin 0 -> 390 bytes
 .../resources/flink-1.7-map-serializer-snapshot    | Bin 0 -> 379 bytes
 .../resources/flink-1.7-null-value-serializer-data |   0
 .../flink-1.7-null-value-serializer-snapshot       | Bin 0 -> 164 bytes
 ...flink-1.7-short-primitive-array-serializer-data | Bin 0 -> 120 bytes
 ...k-1.7-short-primitive-array-serializer-snapshot | Bin 0 -> 206 bytes
 .../test/resources/flink-1.7-short-serializer-data | Bin 0 -> 20 bytes
 .../resources/flink-1.7-short-serializer-snapshot  | Bin 0 -> 152 bytes
 .../flink-1.7-short-value-serializer-data          | Bin 0 -> 20 bytes
 .../flink-1.7-short-value-serializer-snapshot      | Bin 0 -> 167 bytes
 .../resources/flink-1.7-sql-date-serializer-data   | Bin 0 -> 80 bytes
 .../flink-1.7-sql-date-serializer-snapshot         | Bin 0 -> 158 bytes
 .../resources/flink-1.7-sql-time-serializer-data   | Bin 0 -> 80 bytes
 .../flink-1.7-sql-time-serializer-snapshot         | Bin 0 -> 158 bytes
 .../flink-1.7-sql-timestamp-serializer-data        | Bin 0 -> 120 bytes
 .../flink-1.7-sql-timestamp-serializer-snapshot    | Bin 0 -> 173 bytes
 .../flink-1.7-string-array-serializer-data         | Bin 0 -> 240 bytes
 .../flink-1.7-string-array-serializer-snapshot     | Bin 0 -> 182 bytes
 .../resources/flink-1.7-string-serializer-data     |   1 +
 .../resources/flink-1.7-string-serializer-snapshot | Bin 0 -> 155 bytes
 .../flink-1.7-string-value-serializer-data         |   1 +
 .../flink-1.7-string-value-serializer-snapshot     | Bin 0 -> 170 bytes
 .../typeutils/AvroSerializerMigrationTest.java     |  53 ++--
 ...7-avro-generic-type-serializer-address-snapshot | Bin 0 -> 370 bytes
 .../flink-1.7-avro-type-serializer-address-data    | Bin 0 -> 240 bytes
 ...flink-1.7-avro-type-serializer-address-snapshot | Bin 0 -> 380 bytes
 ...ockableTypeSerializerSnapshotMigrationTest.java |  39 ++-
 .../flink-1.7-lockable-type-serializer-data        | Bin 0 -> 160 bytes
 .../flink-1.7-lockable-type-serializer-snapshot    | Bin 0 -> 241 bytes
 .../ListViewSerializerSnapshotMigrationTest.java   |  39 ++-
 .../MapViewSerializerSnapshotMigrationTest.java    |  40 ++-
 .../resources/flink-1.7-list-view-serializer-data  | Bin 0 -> 240 bytes
 .../flink-1.7-list-view-serializer-snapshot        | Bin 0 -> 314 bytes
 .../resources/flink-1.7-map-view-serializer-data   | Bin 0 -> 390 bytes
 .../flink-1.7-map-view-serializer-snapshot         | Bin 0 -> 454 bytes
 .../state/ArrayListSerializerMigrationTest.java    |  37 ++-
 .../resources/flink-1.7-arraylist-serializer-data  | Bin 0 -> 240 bytes
 .../flink-1.7-arraylist-serializer-snapshot        | Bin 0 -> 231 bytes
 ...ScalaEitherSerializerSnapshotMigrationTest.java |  40 ++-
 .../flink-1.7-scala-either-serializer-data         | Bin 0 -> 130 bytes
 .../flink-1.7-scala-either-serializer-snapshot     | Bin 0 -> 381 bytes
 105 files changed, 412 insertions(+), 593 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
index c8833b6..6294bef 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
@@ -25,12 +25,10 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
 import org.apache.flink.api.java.typeutils.runtime.JavaEitherSerializerSnapshot;
 import org.apache.flink.testutils.migration.MigrationVersion;
-import org.apache.flink.types.Either;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
 import java.util.Collection;
 
 /**
@@ -45,33 +43,21 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer
 
 	@SuppressWarnings("unchecked")
 	@Parameterized.Parameters(name = "Test Specification = {0}")
-	public static Collection<Object[]> testSpecifications() {
-
-		// Either<String, Integer>
-
-		final TestSpecification<Either<String, Integer>> either =TestSpecification.<Either<String, Integer>>builder(
-				"1.6-either",
-				EitherSerializer.class,
-				JavaEitherSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE))
-			.withSnapshotDataLocation("flink-1.6-either-type-serializer-snapshot")
-			.withTestData("flink-1.6-either-type-serializer-data", 10);
-
-		// GenericArray<String>
-
-		final TestSpecification<String[]> array = TestSpecification.<String[]>builder(
-				"1.6-generic-array",
-				GenericArraySerializer.class,
-				GenericArraySerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
-			.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
-			.withTestData("flink-1.6-array-type-serializer-data", 10);
-
-		return Arrays.asList(
-			new Object[]{either},
-			new Object[]{array}
-		);
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"either-serializer",
+			EitherSerializer.class,
+			JavaEitherSerializerSnapshot.class,
+			() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE));
+		testSpecifications.add(
+			"generic-array-serializer",
+			GenericArraySerializer.class,
+			GenericArraySerializerSnapshot.class,
+			() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE));
+
+		return testSpecifications.get();
 	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
index 05143a8..57c939d 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
@@ -37,9 +37,13 @@ import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.function.Supplier;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -280,6 +284,111 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		}
 	}
 
+	/**
+	 * Utility class to help build a collection of {@link TestSpecification} for
+	 * multiple test migration versions. For each test specification added,
+	 * an entry will be added for each specified migration version.
+	 */
+	protected static final class TestSpecifications {
+
+		private static final int DEFAULT_TEST_DATA_COUNT = 10;
+		private static final String DEFAULT_SNAPSHOT_FILENAME_FORMAT = "flink-%s-%s-snapshot";
+		private static final String DEFAULT_TEST_DATA_FILENAME_FORMAT = "flink-%s-%s-data";
+
+		private final Collection<TestSpecification<?>> testSpecifications = new LinkedList<>();
+		private final MigrationVersion[] testVersions;
+
+		public TestSpecifications(MigrationVersion... testVersions) {
+			checkArgument(
+				testVersions.length > 0,
+				"At least one test migration version should be specified.");
+			this.testVersions = testVersions;
+		}
+
+		/**
+		 * Adds a test specification to be tested for all specified test versions.
+		 *
+		 * <p>This method adds the specification with pre-defined snapshot and data filenames,
+		 * with the format "flink-&lt;testVersion&gt;-&lt;specName&gt;-&lt;data/snapshot&gt;",
+		 * and each specification's test data count is assumed to always be 10.
+		 *
+		 * @param name test specification name.
+		 * @param serializerClass class of the current serializer.
+		 * @param snapshotClass class of the current serializer snapshot class.
+		 * @param serializerProvider provider for an instance of the current serializer.
+		 *
+		 * @param <T> type of the test data.
+		 */
+		public <T> void add(
+				String name,
+				Class<? extends TypeSerializer> serializerClass,
+				Class<? extends TypeSerializerSnapshot> snapshotClass,
+				Supplier<? extends TypeSerializer<T>> serializerProvider) {
+			for (MigrationVersion testVersion : testVersions) {
+				testSpecifications.add(
+					TestSpecification.<T>builder(
+						getSpecNameForVersion(name, testVersion),
+						serializerClass,
+						snapshotClass,
+						testVersion)
+						.withNewSerializerProvider(serializerProvider)
+						.withSnapshotDataLocation(
+							String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name))
+						.withTestData(
+							String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name),
+							DEFAULT_TEST_DATA_COUNT)
+				);
+			}
+		}
+
+		/**
+		 * Adds a test specification to be tested for all specified test versions.
+		 *
+		 * @param name test specification name.
+		 * @param serializerClass class of the current serializer.
+		 * @param snapshotClass class of the current serializer snapshot class.
+		 * @param serializerProvider provider for an instance of the current serializer.
+		 * @param testSnapshotFilenameProvider provider for the filename of the test snapshot.
+		 * @param testDataFilenameProvider provider for the filename of the test data.
+		 * @param testDataCount expected number of records to be read in the test data files.
+		 *
+		 * @param <T> type of the test data.
+		 */
+		public <T> void add(
+				String name,
+				Class<? extends TypeSerializer> serializerClass,
+				Class<? extends TypeSerializerSnapshot> snapshotClass,
+				Supplier<? extends TypeSerializer<T>> serializerProvider,
+				TestResourceFilenameSupplier testSnapshotFilenameProvider,
+				TestResourceFilenameSupplier testDataFilenameProvider,
+				int testDataCount) {
+			for (MigrationVersion testVersion : testVersions) {
+				testSpecifications.add(
+					TestSpecification.<T>builder(
+						getSpecNameForVersion(name, testVersion),
+						serializerClass,
+						snapshotClass,
+						testVersion)
+					.withNewSerializerProvider(serializerProvider)
+					.withSnapshotDataLocation(testSnapshotFilenameProvider.get(testVersion))
+					.withTestData(testDataFilenameProvider.get(testVersion), testDataCount)
+				);
+			}
+		}
+
+		public Collection<TestSpecification<?>> get() {
+			return Collections.unmodifiableCollection(testSpecifications);
+		}
+
+		private static String getSpecNameForVersion(String baseName, MigrationVersion testVersion) {
+			return testVersion + "-" + baseName;
+		}
+	}
+
+	protected interface TestResourceFilenameSupplier {
+		String get(MigrationVersion testVersion);
+	}
+
 	// --------------------------------------------------------------------------------------------------------------
 	// Utilities
 	// --------------------------------------------------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
index 93725c4..2ed4a85 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
@@ -19,29 +19,12 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
-
 import org.apache.flink.testutils.migration.MigrationVersion;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.CharValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Date;
 
 /**
  * Migration tests for basic type serializers' snapshots.
@@ -55,309 +38,36 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 
 	@SuppressWarnings("unchecked")
 	@Parameterized.Parameters(name = "Test Specification = {0}")
-	public static Collection<Object> testSpecifications() {
-
-		// BigDecimal
-
-		final TestSpecification<BigDecimal> bigDec = TestSpecification.<BigDecimal>builder(
-				"1.6-big-dec",
-				BigDecSerializer.class,
-				BigDecSerializer.BigDecSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> BigDecSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-big-dec-serializer-snapshot")
-			.withTestData("flink-1.6-big-dec-serializer-data", 10);
-
-		// BigInteger
-
-		final TestSpecification<BigInteger> bigInt = TestSpecification.<BigInteger>builder(
-				"1.6-big-int",
-				BigIntSerializer.class,
-				BigIntSerializer.BigIntSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> BigIntSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-big-int-serializer-snapshot")
-			.withTestData("flink-1.6-big-int-serializer-data", 10);
-
-		// Boolean
-
-		final TestSpecification<Boolean> booleanType = TestSpecification.<Boolean>builder(
-				"1.6-boolean",
-				BooleanSerializer.class,
-				BooleanSerializer.BooleanSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> BooleanSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-boolean-serializer-snapshot")
-			.withTestData("flink-1.6-boolean-serializer-data", 10);
-
-		// BooleanValue
-
-		final TestSpecification<BooleanValue> booleanValue = TestSpecification.<BooleanValue>builder(
-				"1.6-boolean-value",
-				BooleanValueSerializer.class,
-				BooleanValueSerializer.BooleanValueSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> BooleanValueSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-boolean-value-serializer-snapshot")
-			.withTestData("flink-1.6-boolean-value-serializer-data", 10);
-
-		// Byte
-
-		final TestSpecification<Byte> byteType = TestSpecification.<Byte>builder(
-				"1.6-byte",
-				ByteSerializer.class,
-				ByteSerializer.ByteSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> ByteSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-byte-serializer-snapshot")
-			.withTestData("flink-1.6-byte-serializer-data", 10);
-
-		// ByteValue
-
-		final TestSpecification<ByteValue> byteValue = TestSpecification.<ByteValue>builder(
-				"1.6-byte-value",
-				ByteValueSerializer.class,
-				ByteValueSerializer.ByteValueSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> ByteValueSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-byte-value-serializer-snapshot")
-			.withTestData("flink-1.6-byte-value-serializer-data", 10);
-
-		// Character
-
-		final TestSpecification<Character> charType = TestSpecification.<Character>builder(
-				"1.6-char",
-				CharSerializer.class,
-				CharSerializer.CharSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> CharSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-char-serializer-snapshot")
-			.withTestData("flink-1.6-char-serializer-data", 10);
-
-		// CharValue
-
-		final TestSpecification<CharValue> charValue = TestSpecification.<CharValue>builder(
-				"1.6-char-value",
-				CharValueSerializer.class,
-				CharValueSerializer.CharValueSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> CharValueSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-char-value-serializer-snapshot")
-			.withTestData("flink-1.6-char-value-serializer-data", 10);
-
-		// java.util.Date
-
-		final TestSpecification<Date> javaDate = TestSpecification.<Date>builder(
-				"1.6-date",
-				DateSerializer.class,
-				DateSerializer.DateSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> DateSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-date-serializer-snapshot")
-			.withTestData("flink-1.6-date-serializer-data", 10);
-
-		// Double
-
-		final TestSpecification<Double> doubleType = TestSpecification.<Double>builder(
-				"1.6-double",
-				DoubleSerializer.class,
-				DoubleSerializer.DoubleSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> DoubleSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-double-serializer-snapshot")
-			.withTestData("flink-1.6-double-serializer-data", 10);
-
-		// DoubleValue
-
-		final TestSpecification<DoubleValue> doubleValue = TestSpecification.<DoubleValue>builder(
-				"1.6-double-value",
-				DoubleValueSerializer.class,
-				DoubleValueSerializer.DoubleValueSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> DoubleValueSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-double-value-serializer-snapshot")
-			.withTestData("flink-1.6-double-value-serializer-data", 10);
-
-		// Float
-
-		final TestSpecification<Float> floatType = TestSpecification.<Float>builder(
-				"1.6-float",
-				FloatSerializer.class,
-				FloatSerializer.FloatSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> FloatSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-float-serializer-snapshot")
-			.withTestData("flink-1.6-float-serializer-data", 10);
-
-		// FloatValue
-
-		final TestSpecification<FloatValue> floatValue = TestSpecification.<FloatValue>builder(
-				"1.6-float-value",
-				FloatValueSerializer.class,
-				FloatValueSerializer.FloatValueSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> FloatValueSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-float-value-serializer-snapshot")
-			.withTestData("flink-1.6-float-value-serializer-data", 10);
-
-		// Integer
-
-		final TestSpecification<Integer> intType = TestSpecification.<Integer>builder(
-				"1.6-int",
-				IntSerializer.class,
-				IntSerializer.IntSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> IntSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-int-serializer-snapshot")
-			.withTestData("flink-1.6-int-serializer-data", 10);
-
-		// IntValue
-
-		final TestSpecification<IntValue> intValue = TestSpecification.<IntValue>builder(
-				"1.6-int-value",
-				IntValueSerializer.class,
-				IntValueSerializer.IntValueSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> IntValueSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-int-value-serializer-snapshot")
-			.withTestData("flink-1.6-int-value-serializer-data", 10);
-
-		// Long
-
-		final TestSpecification<Long> longType = TestSpecification.<Long>builder(
-				"1.6-long",
-				LongSerializer.class,
-				LongSerializer.LongSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> LongSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-long-serializer-snapshot")
-			.withTestData("flink-1.6-long-serializer-data", 10);
-
-		// LongValue
-
-		final TestSpecification<LongValue> longValue = TestSpecification.<LongValue>builder(
-				"1.6-long-value",
-				LongValueSerializer.class,
-				LongValueSerializer.LongValueSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> LongValueSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-long-value-serializer-snapshot")
-			.withTestData("flink-1.6-long-value-serializer-data", 10);
-
-		// NullValue
-
-		final TestSpecification<NullValue> nullValue = TestSpecification.<NullValue>builder(
-				"1.6-null-value",
-				NullValueSerializer.class,
-				NullValueSerializer.NullValueSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> NullValueSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-null-value-serializer-snapshot")
-			.withTestData("flink-1.6-null-value-serializer-data", 10);
-
-		// Short
-
-		final TestSpecification<Short> shortType = TestSpecification.<Short>builder(
-				"1.6-short",
-				ShortSerializer.class,
-				ShortSerializer.ShortSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> ShortSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-short-serializer-snapshot")
-			.withTestData("flink-1.6-short-serializer-data", 10);
-
-		// ShortValue
-
-		final TestSpecification<ShortValue> shortValue = TestSpecification.<ShortValue>builder(
-				"1.6-short-value",
-				ShortValueSerializer.class,
-				ShortValueSerializer.ShortValueSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> ShortValueSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-short-value-serializer-snapshot")
-			.withTestData("flink-1.6-short-value-serializer-data", 10);
-
-		// java.sql.Date
-
-		final TestSpecification<java.sql.Date> sqlDate = TestSpecification.<java.sql.Date>builder(
-				"1.6-sql-date",
-				SqlDateSerializer.class,
-				SqlDateSerializer.SqlDateSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> SqlDateSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-sql-date-serializer-snapshot")
-			.withTestData("flink-1.6-sql-date-serializer-data", 10);
-
-		// java.sql.Time
-
-		final TestSpecification<Time> sqlTime = TestSpecification.<Time>builder(
-				"1.6-sql-time",
-				SqlTimeSerializer.class,
-				SqlTimeSerializer.SqlTimeSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> SqlTimeSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-sql-time-serializer-snapshot")
-			.withTestData("flink-1.6-sql-time-serializer-data", 10);
-
-		// java.sql.Timestamp
-
-		final TestSpecification<Timestamp> sqlTimestamp = TestSpecification.<Timestamp>builder(
-				"1.6-sql-timestamp",
-				SqlTimestampSerializer.class,
-				SqlTimestampSerializer.SqlTimestampSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> SqlTimestampSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-sql-timestamp-serializer-snapshot")
-			.withTestData("flink-1.6-sql-timestamp-serializer-data", 10);
-
-		// String
-
-		final TestSpecification<String> stringType = TestSpecification.<String>builder(
-				"1.6-string",
-				StringSerializer.class,
-				StringSerializer.StringSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> StringSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-string-serializer-snapshot")
-			.withTestData("flink-1.6-string-serializer-data", 10);
-
-		// StringValue
-
-		final TestSpecification<StringValue> stringValue = TestSpecification.<StringValue>builder(
-				"1.6-string-value",
-				StringValueSerializer.class,
-				StringValueSerializer.StringValueSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> StringValueSerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-string-value-serializer-snapshot")
-			.withTestData("flink-1.6-string-value-serializer-data", 10);
-
-		return Arrays.asList(
-			bigDec,
-			bigInt,
-			booleanType,
-			booleanValue,
-			byteType,
-			byteValue,
-			charType,
-			charValue,
-			javaDate,
-			doubleType,
-			doubleValue,
-			floatType,
-			floatValue,
-			intType,
-			intValue,
-			longType,
-			longValue,
-			nullValue,
-			shortType,
-			shortValue,
-			sqlDate,
-			sqlTime,
-			sqlTimestamp,
-			stringType,
-			stringValue
-		);
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add("big-dec-serializer", BigDecSerializer.class, BigDecSerializer.BigDecSerializerSnapshot.class, () -> BigDecSerializer.INSTANCE);
+		testSpecifications.add("big-int-serializer", BigIntSerializer.class, BigIntSerializer.BigIntSerializerSnapshot.class, () -> BigIntSerializer.INSTANCE);
+		testSpecifications.add("boolean-serializer", BooleanSerializer.class, BooleanSerializer.BooleanSerializerSnapshot.class, () -> BooleanSerializer.INSTANCE);
+		testSpecifications.add("boolean-value-serializer", BooleanValueSerializer.class, BooleanValueSerializer.BooleanValueSerializerSnapshot.class, () -> BooleanValueSerializer.INSTANCE);
+		testSpecifications.add("byte-serializer", ByteSerializer.class, ByteSerializer.ByteSerializerSnapshot.class, () -> ByteSerializer.INSTANCE);
+		testSpecifications.add("byte-value-serializer", ByteValueSerializer.class, ByteValueSerializer.ByteValueSerializerSnapshot.class, () -> ByteValueSerializer.INSTANCE);
+		testSpecifications.add("char-serializer", CharSerializer.class, CharSerializer.CharSerializerSnapshot.class, () -> CharSerializer.INSTANCE);
+		testSpecifications.add("char-value-serializer", CharValueSerializer.class, CharValueSerializer.CharValueSerializerSnapshot.class, () -> CharValueSerializer.INSTANCE);
+		testSpecifications.add("date-serializer", DateSerializer.class, DateSerializer.DateSerializerSnapshot.class, () -> DateSerializer.INSTANCE);
+		testSpecifications.add("double-serializer", DoubleSerializer.class, DoubleSerializer.DoubleSerializerSnapshot.class, () -> DoubleSerializer.INSTANCE);
+		testSpecifications.add("double-value-serializer", DoubleValueSerializer.class, DoubleValueSerializer.DoubleValueSerializerSnapshot.class, () -> DoubleValueSerializer.INSTANCE);
+		testSpecifications.add("float-serializer", FloatSerializer.class, FloatSerializer.FloatSerializerSnapshot.class, () -> FloatSerializer.INSTANCE);
+		testSpecifications.add("float-value-serializer", FloatValueSerializer.class, FloatValueSerializer.FloatValueSerializerSnapshot.class, () -> FloatValueSerializer.INSTANCE);
+		testSpecifications.add("int-serializer", IntSerializer.class, IntSerializer.IntSerializerSnapshot.class, () -> IntSerializer.INSTANCE);
+		testSpecifications.add("int-value-serializer", IntValueSerializer.class, IntValueSerializer.IntValueSerializerSnapshot.class, () -> IntValueSerializer.INSTANCE);
+		testSpecifications.add("long-serializer", LongSerializer.class, LongSerializer.LongSerializerSnapshot.class, () -> LongSerializer.INSTANCE);
+		testSpecifications.add("long-value-serializer", LongValueSerializer.class, LongValueSerializer.LongValueSerializerSnapshot.class, () -> LongValueSerializer.INSTANCE);
+		testSpecifications.add("null-value-serializer", NullValueSerializer.class, NullValueSerializer.NullValueSerializerSnapshot.class, () -> NullValueSerializer.INSTANCE);
+		testSpecifications.add("short-serializer", ShortSerializer.class, ShortSerializer.ShortSerializerSnapshot.class, () -> ShortSerializer.INSTANCE);
+		testSpecifications.add("short-value-serializer", ShortValueSerializer.class, ShortValueSerializer.ShortValueSerializerSnapshot.class, () -> ShortValueSerializer.INSTANCE);
+		testSpecifications.add("sql-date-serializer", SqlDateSerializer.class, SqlDateSerializer.SqlDateSerializerSnapshot.class, () -> SqlDateSerializer.INSTANCE);
+		testSpecifications.add("sql-time-serializer", SqlTimeSerializer.class, SqlTimeSerializer.SqlTimeSerializerSnapshot.class, () -> SqlTimeSerializer.INSTANCE);
+		testSpecifications.add("sql-timestamp-serializer", SqlTimestampSerializer.class, SqlTimestampSerializer.SqlTimestampSerializerSnapshot.class, () -> SqlTimestampSerializer.INSTANCE);
+		testSpecifications.add("string-serializer", StringSerializer.class, StringSerializer.StringSerializerSnapshot.class, () -> StringSerializer.INSTANCE);
+		testSpecifications.add("string-value-serializer", StringValueSerializer.class, StringValueSerializer.StringValueSerializerSnapshot.class, () -> StringValueSerializer.INSTANCE);
+
+		return testSpecifications.get();
 	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
index 524a801..f3662c6 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
@@ -20,27 +20,36 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 import org.apache.flink.testutils.migration.MigrationVersion;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Collection;
 import java.util.List;
 
 /**
  * Migration test for the {@link ListSerializerSnapshot}.
  */
+@RunWith(Parameterized.class)
 public class ListSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<List<String>> {
 
-	private static final String DATA = "flink-1.6-list-serializer-data";
-	private static final String SNAPSHOT = "flink-1.6-list-serializer-snapshot";
-
-	public ListSerializerSnapshotMigrationTest() {
-		super(
-			TestSpecification.<List<String>>builder(
-					"1.6-list-serializer",
-					ListSerializer.class,
-					ListSerializerSnapshot.class,
-					MigrationVersion.v1_6)
-				.withNewSerializerProvider(() -> new ListSerializer<>(StringSerializer.INSTANCE))
-				.withSnapshotDataLocation(SNAPSHOT)
-				.withTestData(DATA, 10)
-		);
+	private static final String SPEC_NAME = "list-serializer";
+
+	public ListSerializerSnapshotMigrationTest(TestSpecification<List<String>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			SPEC_NAME,
+			ListSerializer.class,
+			ListSerializerSnapshot.class,
+			() -> new ListSerializer<>(StringSerializer.INSTANCE));
+
+		return testSpecifications.get();
 	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
index be9f152..804eac4 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
@@ -21,26 +21,36 @@ package org.apache.flink.api.common.typeutils.base;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 import org.apache.flink.testutils.migration.MigrationVersion;
 
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
 import java.util.Map;
 
 /**
  * Migration test for the {@link MapSerializerSnapshot}.
  */
+@RunWith(Parameterized.class)
 public class MapSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Map<Integer, String>> {
 
-	private static final String DATA = "flink-1.6-map-serializer-data";
-	private static final String SNAPSHOT = "flink-1.6-map-serializer-snapshot";
-
-	public MapSerializerSnapshotMigrationTest() {
-		super(
-			TestSpecification.<Map<Integer, String>>builder(
-					"1.6-map-serializer",
-					MapSerializer.class,
-					MapSerializerSnapshot.class,
-					MigrationVersion.v1_6)
-				.withNewSerializerProvider(() -> new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
-				.withSnapshotDataLocation(SNAPSHOT)
-				.withTestData(DATA, 10)
-		);
+	private static final String SPEC_NAME = "map-serializer";
+
+	public MapSerializerSnapshotMigrationTest(TestSpecification<Map<Integer, String>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			SPEC_NAME,
+			MapSerializer.class,
+			MapSerializerSnapshot.class,
+			() -> new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE));
+
+		return testSpecifications.get();
 	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
index f6aa59f..6163e2c 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
@@ -19,12 +19,11 @@
 package org.apache.flink.api.common.typeutils.base.array;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
-
 import org.apache.flink.testutils.migration.MigrationVersion;
+
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
 import java.util.Collection;
 
 /**
@@ -39,118 +38,57 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 
 	@SuppressWarnings("unchecked")
 	@Parameterized.Parameters(name = "Test Specification = {0}")
-	public static Collection<Object> testSpecifications() {
-
-		// boolean[]
-
-		final TestSpecification<boolean[]> booleanArray = TestSpecification.<boolean[]>builder(
-				"1.6-boolean-primitive-array",
-				BooleanPrimitiveArraySerializer.class,
-				BooleanPrimitiveArraySerializer.BooleanPrimitiveArraySerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> BooleanPrimitiveArraySerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-boolean-primitive-array-serializer-snapshot")
-			.withTestData("flink-1.6-boolean-primitive-array-serializer-data", 10);
-
-		// byte[]
-
-		final TestSpecification<byte[]> byteArray = TestSpecification.<byte[]>builder(
-				"1.6-byte-primitive-array",
-				BytePrimitiveArraySerializer.class,
-				BytePrimitiveArraySerializer.BytePrimitiveArraySerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> BytePrimitiveArraySerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-byte-primitive-array-serializer-snapshot")
-			.withTestData("flink-1.6-byte-primitive-array-serializer-data", 10);
-
-		// char[]
-
-		final TestSpecification<char[]> charArray = TestSpecification.<char[]>builder(
-				"1.6-char-primitive-array",
-				CharPrimitiveArraySerializer.class,
-				CharPrimitiveArraySerializer.CharPrimitiveArraySerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> CharPrimitiveArraySerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-char-primitive-array-serializer-snapshot")
-			.withTestData("flink-1.6-char-primitive-array-serializer-data", 10);
-
-		// double[]
-
-		final TestSpecification<double[]> doubleArray = TestSpecification.<double[]>builder(
-				"1.6-double-primitive-array",
-				DoublePrimitiveArraySerializer.class,
-				DoublePrimitiveArraySerializer.DoublePrimitiveArraySerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> DoublePrimitiveArraySerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-double-primitive-array-serializer-snapshot")
-			.withTestData("flink-1.6-double-primitive-array-serializer-data", 10);
-
-		// float[]
-
-		final TestSpecification<float[]> floatArray = TestSpecification.<float[]>builder(
-				"1.6-float-primitive-array",
-				FloatPrimitiveArraySerializer.class,
-				FloatPrimitiveArraySerializer.FloatPrimitiveArraySerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> FloatPrimitiveArraySerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-float-primitive-array-serializer-snapshot")
-			.withTestData("flink-1.6-float-primitive-array-serializer-data", 10);
-
-		// int[]
-
-		final TestSpecification<int[]> intArray = TestSpecification.<int[]>builder(
-				"1.6-int-primitive-array",
-				IntPrimitiveArraySerializer.class,
-				IntPrimitiveArraySerializer.IntPrimitiveArraySerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> IntPrimitiveArraySerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-int-primitive-array-serializer-snapshot")
-			.withTestData("flink-1.6-int-primitive-array-serializer-data", 10);
-
-		// long[]
-
-		final TestSpecification<long[]> longArray = TestSpecification.<long[]>builder(
-				"1.6-long-primitive-array",
-				LongPrimitiveArraySerializer.class,
-				LongPrimitiveArraySerializer.LongPrimitiveArraySerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> LongPrimitiveArraySerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-long-primitive-array-serializer-snapshot")
-			.withTestData("flink-1.6-long-primitive-array-serializer-data", 10);
-
-		// short[]
-
-		final TestSpecification<short[]> shortArray = TestSpecification.<short[]>builder(
-				"1.6-short-primitive-array",
-				ShortPrimitiveArraySerializer.class,
-				ShortPrimitiveArraySerializer.ShortPrimitiveArraySerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> ShortPrimitiveArraySerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-short-primitive-array-serializer-snapshot")
-			.withTestData("flink-1.6-short-primitive-array-serializer-data", 10);
-
-		// String[]
-
-		final TestSpecification<String[]> stringArray = TestSpecification.<String[]>builder(
-				"1.6-string-array",
-				StringArraySerializer.class,
-				StringArraySerializer.StringArraySerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> StringArraySerializer.INSTANCE)
-			.withSnapshotDataLocation("flink-1.6-string-array-serializer-snapshot")
-			.withTestData("flink-1.6-string-array-serializer-data", 10);
-
-		return Arrays.asList(
-			booleanArray,
-			byteArray,
-			charArray,
-			doubleArray,
-			floatArray,
-			intArray,
-			longArray,
-			shortArray,
-			stringArray
-		);
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"boolean-primitive-array-serializer",
+			BooleanPrimitiveArraySerializer.class,
+			BooleanPrimitiveArraySerializer.BooleanPrimitiveArraySerializerSnapshot.class,
+			() -> BooleanPrimitiveArraySerializer.INSTANCE);
+		testSpecifications.add(
+			"byte-primitive-array-serializer",
+			BytePrimitiveArraySerializer.class,
+			BytePrimitiveArraySerializer.BytePrimitiveArraySerializerSnapshot.class,
+			() -> BytePrimitiveArraySerializer.INSTANCE);
+		testSpecifications.add(
+			"char-primitive-array-serializer",
+			CharPrimitiveArraySerializer.class,
+			CharPrimitiveArraySerializer.CharPrimitiveArraySerializerSnapshot.class,
+			() -> CharPrimitiveArraySerializer.INSTANCE);
+		testSpecifications.add(
+			"double-primitive-array-serializer",
+			DoublePrimitiveArraySerializer.class,
+			DoublePrimitiveArraySerializer.DoublePrimitiveArraySerializerSnapshot.class,
+			() -> DoublePrimitiveArraySerializer.INSTANCE);
+		testSpecifications.add(
+			"float-primitive-array-serializer",
+			FloatPrimitiveArraySerializer.class,
+			FloatPrimitiveArraySerializer.FloatPrimitiveArraySerializerSnapshot.class,
+			() -> FloatPrimitiveArraySerializer.INSTANCE);
+		testSpecifications.add(
+			"int-primitive-array-serializer",
+			IntPrimitiveArraySerializer.class,
+			IntPrimitiveArraySerializer.IntPrimitiveArraySerializerSnapshot.class,
+			() -> IntPrimitiveArraySerializer.INSTANCE);
+		testSpecifications.add(
+			"long-primitive-array-serializer",
+			LongPrimitiveArraySerializer.class,
+			LongPrimitiveArraySerializer.LongPrimitiveArraySerializerSnapshot.class,
+			() -> LongPrimitiveArraySerializer.INSTANCE);
+		testSpecifications.add(
+			"short-primitive-array-serializer",
+			ShortPrimitiveArraySerializer.class,
+			ShortPrimitiveArraySerializer.ShortPrimitiveArraySerializerSnapshot.class,
+			() -> ShortPrimitiveArraySerializer.INSTANCE);
+		testSpecifications.add(
+			"string-array-serializer",
+			StringArraySerializer.class,
+			StringArraySerializer.StringArraySerializerSnapshot.class,
+			() -> StringArraySerializer.INSTANCE);
+
+		return testSpecifications.get();
 	}
 
 }
diff --git a/flink-core/src/test/resources/flink-1.6-either-type-serializer-data b/flink-core/src/test/resources/flink-1.6-either-serializer-data
similarity index 100%
rename from flink-core/src/test/resources/flink-1.6-either-type-serializer-data
rename to flink-core/src/test/resources/flink-1.6-either-serializer-data
diff --git a/flink-core/src/test/resources/flink-1.6-either-type-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-either-serializer-snapshot
similarity index 100%
rename from flink-core/src/test/resources/flink-1.6-either-type-serializer-snapshot
rename to flink-core/src/test/resources/flink-1.6-either-serializer-snapshot
diff --git a/flink-core/src/test/resources/flink-1.6-array-type-serializer-data b/flink-core/src/test/resources/flink-1.6-generic-array-serializer-data
similarity index 100%
rename from flink-core/src/test/resources/flink-1.6-array-type-serializer-data
rename to flink-core/src/test/resources/flink-1.6-generic-array-serializer-data
diff --git a/flink-core/src/test/resources/flink-1.6-array-type-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-generic-array-serializer-snapshot
similarity index 100%
rename from flink-core/src/test/resources/flink-1.6-array-type-serializer-snapshot
rename to flink-core/src/test/resources/flink-1.6-generic-array-serializer-snapshot
diff --git a/flink-core/src/test/resources/flink-1.7-big-dec-serializer-data b/flink-core/src/test/resources/flink-1.7-big-dec-serializer-data
new file mode 100644
index 0000000..77e75f8
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-big-dec-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-big-dec-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-big-dec-serializer-snapshot
new file mode 100644
index 0000000..06f174d
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-big-dec-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-big-int-serializer-data b/flink-core/src/test/resources/flink-1.7-big-int-serializer-data
new file mode 100644
index 0000000..5cabbfc
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-big-int-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-big-int-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-big-int-serializer-snapshot
new file mode 100644
index 0000000..3a8111e
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-big-int-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-boolean-primitive-array-serializer-data b/flink-core/src/test/resources/flink-1.7-boolean-primitive-array-serializer-data
new file mode 100644
index 0000000..96acd9c
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-boolean-primitive-array-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-boolean-primitive-array-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-boolean-primitive-array-serializer-snapshot
new file mode 100644
index 0000000..7001c14
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-boolean-primitive-array-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-boolean-serializer-data b/flink-core/src/test/resources/flink-1.7-boolean-serializer-data
new file mode 100644
index 0000000..21808bb
--- /dev/null
+++ b/flink-core/src/test/resources/flink-1.7-boolean-serializer-data
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/flink-core/src/test/resources/flink-1.7-boolean-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-boolean-serializer-snapshot
new file mode 100644
index 0000000..c65c7db
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-boolean-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-boolean-value-serializer-data b/flink-core/src/test/resources/flink-1.7-boolean-value-serializer-data
new file mode 100644
index 0000000..cb43b5c
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-boolean-value-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-boolean-value-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-boolean-value-serializer-snapshot
new file mode 100644
index 0000000..d4e5cf8
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-boolean-value-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-byte-primitive-array-serializer-data b/flink-core/src/test/resources/flink-1.7-byte-primitive-array-serializer-data
new file mode 100644
index 0000000..8956669
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-byte-primitive-array-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-byte-primitive-array-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-byte-primitive-array-serializer-snapshot
new file mode 100644
index 0000000..a9cba34
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-byte-primitive-array-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-byte-serializer-data b/flink-core/src/test/resources/flink-1.7-byte-serializer-data
new file mode 100644
index 0000000..0120536
--- /dev/null
+++ b/flink-core/src/test/resources/flink-1.7-byte-serializer-data
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/flink-core/src/test/resources/flink-1.7-byte-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-byte-serializer-snapshot
new file mode 100644
index 0000000..a09d139
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-byte-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-byte-value-serializer-data b/flink-core/src/test/resources/flink-1.7-byte-value-serializer-data
new file mode 100644
index 0000000..0120536
--- /dev/null
+++ b/flink-core/src/test/resources/flink-1.7-byte-value-serializer-data
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/flink-core/src/test/resources/flink-1.7-byte-value-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-byte-value-serializer-snapshot
new file mode 100644
index 0000000..c37f0aa
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-byte-value-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-char-primitive-array-serializer-data b/flink-core/src/test/resources/flink-1.7-char-primitive-array-serializer-data
new file mode 100644
index 0000000..918413b
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-char-primitive-array-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-char-primitive-array-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-char-primitive-array-serializer-snapshot
new file mode 100644
index 0000000..3f7deae
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-char-primitive-array-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-char-serializer-data b/flink-core/src/test/resources/flink-1.7-char-serializer-data
new file mode 100644
index 0000000..9ecd9cd
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-char-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-char-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-char-serializer-snapshot
new file mode 100644
index 0000000..2671531
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-char-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-char-value-serializer-data b/flink-core/src/test/resources/flink-1.7-char-value-serializer-data
new file mode 100644
index 0000000..9ecd9cd
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-char-value-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-char-value-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-char-value-serializer-snapshot
new file mode 100644
index 0000000..3d10a36
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-char-value-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-date-serializer-data b/flink-core/src/test/resources/flink-1.7-date-serializer-data
new file mode 100644
index 0000000..d374944
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-date-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-date-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-date-serializer-snapshot
new file mode 100644
index 0000000..d5084d2
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-date-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-double-primitive-array-serializer-data b/flink-core/src/test/resources/flink-1.7-double-primitive-array-serializer-data
new file mode 100644
index 0000000..16e6183
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-double-primitive-array-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-double-primitive-array-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-double-primitive-array-serializer-snapshot
new file mode 100644
index 0000000..3e4ee96
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-double-primitive-array-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-double-serializer-data b/flink-core/src/test/resources/flink-1.7-double-serializer-data
new file mode 100644
index 0000000..49fbf3f
--- /dev/null
+++ b/flink-core/src/test/resources/flink-1.7-double-serializer-data
@@ -0,0 +1 @@
+@	!���n@	!���n@	!���n@	!���n@	!���n@	!���n@	!���n@	!���n@	!���n@	!���n
\ No newline at end of file
diff --git a/flink-core/src/test/resources/flink-1.7-double-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-double-serializer-snapshot
new file mode 100644
index 0000000..2cce3e5
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-double-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-double-value-serializer-data b/flink-core/src/test/resources/flink-1.7-double-value-serializer-data
new file mode 100644
index 0000000..49fbf3f
--- /dev/null
+++ b/flink-core/src/test/resources/flink-1.7-double-value-serializer-data
@@ -0,0 +1 @@
+@	!���n@	!���n@	!���n@	!���n@	!���n@	!���n@	!���n@	!���n@	!���n@	!���n
\ No newline at end of file
diff --git a/flink-core/src/test/resources/flink-1.7-double-value-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-double-value-serializer-snapshot
new file mode 100644
index 0000000..1c87532
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-double-value-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-either-serializer-data b/flink-core/src/test/resources/flink-1.7-either-serializer-data
new file mode 100644
index 0000000..203067c
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-either-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-either-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-either-serializer-snapshot
new file mode 100644
index 0000000..7f2cf0f
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-either-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-float-primitive-array-serializer-data b/flink-core/src/test/resources/flink-1.7-float-primitive-array-serializer-data
new file mode 100644
index 0000000..e5371bb
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-float-primitive-array-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-float-primitive-array-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-float-primitive-array-serializer-snapshot
new file mode 100644
index 0000000..7e01a51
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-float-primitive-array-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-float-serializer-data b/flink-core/src/test/resources/flink-1.7-float-serializer-data
new file mode 100644
index 0000000..2d8b79f
--- /dev/null
+++ b/flink-core/src/test/resources/flink-1.7-float-serializer-data
@@ -0,0 +1 @@
+@I�@I�@I�@I�@I�@I�@I�@I�@I�@I�
\ No newline at end of file
diff --git a/flink-core/src/test/resources/flink-1.7-float-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-float-serializer-snapshot
new file mode 100644
index 0000000..76e52b4
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-float-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-float-value-serializer-data b/flink-core/src/test/resources/flink-1.7-float-value-serializer-data
new file mode 100644
index 0000000..2d8b79f
--- /dev/null
+++ b/flink-core/src/test/resources/flink-1.7-float-value-serializer-data
@@ -0,0 +1 @@
+@I�@I�@I�@I�@I�@I�@I�@I�@I�@I�
\ No newline at end of file
diff --git a/flink-core/src/test/resources/flink-1.7-float-value-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-float-value-serializer-snapshot
new file mode 100644
index 0000000..c6d2d12
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-float-value-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-generic-array-serializer-data b/flink-core/src/test/resources/flink-1.7-generic-array-serializer-data
new file mode 100644
index 0000000..e2d6a23
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-generic-array-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-generic-array-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-generic-array-serializer-snapshot
new file mode 100644
index 0000000..9ace763
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-generic-array-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-int-primitive-array-serializer-data b/flink-core/src/test/resources/flink-1.7-int-primitive-array-serializer-data
new file mode 100644
index 0000000..6966366
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-int-primitive-array-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-int-primitive-array-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-int-primitive-array-serializer-snapshot
new file mode 100644
index 0000000..6ef72a2
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-int-primitive-array-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-int-serializer-data b/flink-core/src/test/resources/flink-1.7-int-serializer-data
new file mode 100644
index 0000000..a69d5c4
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-int-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-int-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-int-serializer-snapshot
new file mode 100644
index 0000000..de2be82
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-int-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-int-value-serializer-data b/flink-core/src/test/resources/flink-1.7-int-value-serializer-data
new file mode 100644
index 0000000..a69d5c4
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-int-value-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-int-value-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-int-value-serializer-snapshot
new file mode 100644
index 0000000..124b4e4
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-int-value-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-list-serializer-data b/flink-core/src/test/resources/flink-1.7-list-serializer-data
new file mode 100644
index 0000000..7b6c68a
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-list-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-list-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-list-serializer-snapshot
new file mode 100644
index 0000000..a8efd3a
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-list-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-long-primitive-array-serializer-data b/flink-core/src/test/resources/flink-1.7-long-primitive-array-serializer-data
new file mode 100644
index 0000000..267bb12
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-long-primitive-array-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-long-primitive-array-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-long-primitive-array-serializer-snapshot
new file mode 100644
index 0000000..eec2da3
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-long-primitive-array-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-long-serializer-data b/flink-core/src/test/resources/flink-1.7-long-serializer-data
new file mode 100644
index 0000000..0bf438f
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-long-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-long-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-long-serializer-snapshot
new file mode 100644
index 0000000..85cfea9
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-long-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-long-value-serializer-data b/flink-core/src/test/resources/flink-1.7-long-value-serializer-data
new file mode 100644
index 0000000..0bf438f
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-long-value-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-long-value-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-long-value-serializer-snapshot
new file mode 100644
index 0000000..93de3c5
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-long-value-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-map-serializer-data b/flink-core/src/test/resources/flink-1.7-map-serializer-data
new file mode 100644
index 0000000..4d7334a
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-map-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-map-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-map-serializer-snapshot
new file mode 100644
index 0000000..2d5f1f0
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-map-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-null-value-serializer-data b/flink-core/src/test/resources/flink-1.7-null-value-serializer-data
new file mode 100644
index 0000000..e69de29
diff --git a/flink-core/src/test/resources/flink-1.7-null-value-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-null-value-serializer-snapshot
new file mode 100644
index 0000000..c7a05b1
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-null-value-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-short-primitive-array-serializer-data b/flink-core/src/test/resources/flink-1.7-short-primitive-array-serializer-data
new file mode 100644
index 0000000..9dcf64f
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-short-primitive-array-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-short-primitive-array-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-short-primitive-array-serializer-snapshot
new file mode 100644
index 0000000..e07dc66
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-short-primitive-array-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-short-serializer-data b/flink-core/src/test/resources/flink-1.7-short-serializer-data
new file mode 100644
index 0000000..c1a8300
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-short-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-short-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-short-serializer-snapshot
new file mode 100644
index 0000000..3d79053
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-short-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-short-value-serializer-data b/flink-core/src/test/resources/flink-1.7-short-value-serializer-data
new file mode 100644
index 0000000..c1a8300
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-short-value-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-short-value-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-short-value-serializer-snapshot
new file mode 100644
index 0000000..91db2ab
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-short-value-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-sql-date-serializer-data b/flink-core/src/test/resources/flink-1.7-sql-date-serializer-data
new file mode 100644
index 0000000..141ae41
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-sql-date-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-sql-date-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-sql-date-serializer-snapshot
new file mode 100644
index 0000000..ded2e33
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-sql-date-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-sql-time-serializer-data b/flink-core/src/test/resources/flink-1.7-sql-time-serializer-data
new file mode 100644
index 0000000..086ab90
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-sql-time-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-sql-time-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-sql-time-serializer-snapshot
new file mode 100644
index 0000000..f9a4597
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-sql-time-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-sql-timestamp-serializer-data b/flink-core/src/test/resources/flink-1.7-sql-timestamp-serializer-data
new file mode 100644
index 0000000..5639302
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-sql-timestamp-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-sql-timestamp-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-sql-timestamp-serializer-snapshot
new file mode 100644
index 0000000..300a1b6
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-sql-timestamp-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-string-array-serializer-data b/flink-core/src/test/resources/flink-1.7-string-array-serializer-data
new file mode 100644
index 0000000..7b6c68a
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-string-array-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-string-array-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-string-array-serializer-snapshot
new file mode 100644
index 0000000..adbd5a9
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-string-array-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-string-serializer-data b/flink-core/src/test/resources/flink-1.7-string-serializer-data
new file mode 100644
index 0000000..383ccad
--- /dev/null
+++ b/flink-core/src/test/resources/flink-1.7-string-serializer-data
@@ -0,0 +1 @@
+hello worldhello worldhello worldhello worldhello worldhello worldhello worldhello worldhello worldhello world
\ No newline at end of file
diff --git a/flink-core/src/test/resources/flink-1.7-string-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-string-serializer-snapshot
new file mode 100644
index 0000000..a9d82ff
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-string-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-string-value-serializer-data b/flink-core/src/test/resources/flink-1.7-string-value-serializer-data
new file mode 100644
index 0000000..0857b1e
--- /dev/null
+++ b/flink-core/src/test/resources/flink-1.7-string-value-serializer-data
@@ -0,0 +1 @@
+hello-worldhello-worldhello-worldhello-worldhello-worldhello-worldhello-worldhello-worldhello-worldhello-world
\ No newline at end of file
diff --git a/flink-core/src/test/resources/flink-1.7-string-value-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-string-value-serializer-snapshot
new file mode 100644
index 0000000..a435f70
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-string-value-serializer-snapshot differ
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
index 019183e..73b76a6 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
@@ -20,55 +20,52 @@ package org.apache.flink.formats.avro.typeutils;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.flink.testutils.migration.MigrationVersion;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
 import java.util.Collection;
 
 /**
  * Tests migrations for {@link AvroSerializerSnapshot}.
  */
 @RunWith(Parameterized.class)
-public class AvroSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Object> {
+public class AvroSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Address> {
 
-	private static final String DATA = "flink-1.6-avro-type-serializer-address-data";
-	private static final String SPECIFIC_SNAPSHOT = "flink-1.6-avro-type-serializer-address-snapshot";
-	private static final String GENERIC_SNAPSHOT = "flink-1.6-avro-generic-type-serializer-address-snapshot";
+	private static final String DATA_FILE_FORMAT = "flink-%s-avro-type-serializer-address-data";
+	private static final String SPECIFIC_SNAPSHOT_FILE_FORMAT = "flink-%s-avro-type-serializer-address-snapshot";
+	private static final String GENERIC_SNAPSHOT_FILE_FORMAT = "flink-%s-avro-generic-type-serializer-address-snapshot";
 
-	public AvroSerializerMigrationTest(TestSpecification<Object> testSpec) {
+	public AvroSerializerMigrationTest(TestSpecification<Address> testSpec) {
 		super(testSpec);
 	}
 
 	@SuppressWarnings("unchecked")
 	@Parameterized.Parameters(name = "Test Specification = {0}")
-	public static Collection<Object[]> testSpecifications() {
+	public static Collection<TestSpecification<?>> testSpecifications() {
 
-		final TestSpecification<Address> genericCase = TestSpecification.<Address>builder(
-				"1.6-generic",
-				AvroSerializer.class,
-				AvroSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> new AvroSerializer(GenericRecord.class, Address.getClassSchema()))
-			.withSnapshotDataLocation(GENERIC_SNAPSHOT)
-			.withTestData(DATA, 10);
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
 
-		final TestSpecification<Address> specificCase = TestSpecification.<Address>builder(
-				"1.6-specific",
-				AvroSerializer.class,
-				AvroSerializerSnapshot.class,
-				MigrationVersion.v1_6)
-			.withNewSerializerProvider(() -> new AvroSerializer<>(Address.class))
-			.withSnapshotDataLocation(SPECIFIC_SNAPSHOT)
-			.withTestData(DATA, 10);
+		testSpecifications.add(
+			"generic-avro-serializer",
+			AvroSerializer.class,
+			AvroSerializerSnapshot.class,
+			() -> new AvroSerializer(GenericRecord.class, Address.getClassSchema()),
+			testVersion -> String.format(GENERIC_SNAPSHOT_FILE_FORMAT, testVersion),
+			testVersion -> String.format(DATA_FILE_FORMAT, testVersion),
+			10);
+		testSpecifications.add(
+			"specific-avro-serializer",
+			AvroSerializer.class,
+			AvroSerializerSnapshot.class,
+			() -> new AvroSerializer<>(Address.class),
+			testVersion -> String.format(SPECIFIC_SNAPSHOT_FILE_FORMAT, testVersion),
+			testVersion -> String.format(DATA_FILE_FORMAT, testVersion),
+			10);
 
-		return Arrays.asList(
-			new Object[]{genericCase},
-			new Object[]{specificCase}
-		);
+		return testSpecifications.get();
 	}
 
 }
diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot
new file mode 100644
index 0000000..f27d2dc
Binary files /dev/null and b/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot differ
diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-data b/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-data
new file mode 100644
index 0000000..74acf72
Binary files /dev/null and b/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-data differ
diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot
new file mode 100644
index 0000000..7c8f6c2
Binary files /dev/null and b/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot differ
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
index cb911d6..4682617 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
@@ -22,24 +22,35 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTest
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.testutils.migration.MigrationVersion;
 
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
 /**
  * Migration test for the {@link LockableTypeSerializerSnapshot}.
  */
+@RunWith(Parameterized.class)
 public class LockableTypeSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Lockable<String>> {
 
-	private static final String DATA = "flink-1.6-lockable-type-serializer-data";
-	private static final String SNAPSHOT = "flink-1.6-lockable-type-serializer-snapshot";
-
-	public LockableTypeSerializerSnapshotMigrationTest() {
-		super(
-			TestSpecification.<Lockable<String>>builder(
-					"1.6-lockable-type-serializer",
-					Lockable.LockableTypeSerializer.class,
-					LockableTypeSerializerSnapshot.class,
-					MigrationVersion.v1_6)
-				.withNewSerializerProvider(() -> new Lockable.LockableTypeSerializer<>(StringSerializer.INSTANCE))
-				.withSnapshotDataLocation(SNAPSHOT)
-				.withTestData(DATA, 10)
-		);
+	private static final String SPEC_NAME = "lockable-type-serializer";
+
+	public LockableTypeSerializerSnapshotMigrationTest(TestSpecification<Lockable<String>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			SPEC_NAME,
+			Lockable.LockableTypeSerializer.class,
+			LockableTypeSerializerSnapshot.class,
+			() -> new Lockable.LockableTypeSerializer<>(StringSerializer.INSTANCE));
+
+		return testSpecifications.get();
 	}
 }
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-lockable-type-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.7-lockable-type-serializer-data
new file mode 100644
index 0000000..b175040
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-lockable-type-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-lockable-type-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.7-lockable-type-serializer-snapshot
new file mode 100644
index 0000000..42b3d45
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-lockable-type-serializer-snapshot differ
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
index 69cbe08..1983046 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
@@ -24,24 +24,35 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.table.api.dataview.ListView;
 import org.apache.flink.testutils.migration.MigrationVersion;
 
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
 /**
  * Migration test for the {@link ListViewSerializerSnapshot}.
  */
+@RunWith(Parameterized.class)
 public class ListViewSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<ListView<String>> {
 
-	private static final String DATA = "flink-1.6-list-view-serializer-data";
-	private static final String SNAPSHOT = "flink-1.6-list-view-serializer-snapshot";
-
-	public ListViewSerializerSnapshotMigrationTest() {
-		super(
-			TestSpecification.<ListView<String>>builder(
-					"1.6-list-view-serializer",
-					ListViewSerializer.class,
-					ListViewSerializerSnapshot.class,
-					MigrationVersion.v1_6)
-				.withNewSerializerProvider(() -> new ListViewSerializer<>(new ListSerializer<>(StringSerializer.INSTANCE)))
-				.withSnapshotDataLocation(SNAPSHOT)
-				.withTestData(DATA, 10)
-		);
+	private static final String SPEC_NAME = "list-view-serializer";
+
+	public ListViewSerializerSnapshotMigrationTest(TestSpecification<ListView<String>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			SPEC_NAME,
+			ListViewSerializer.class,
+			ListViewSerializerSnapshot.class,
+			() -> new ListViewSerializer<>(new ListSerializer<>(StringSerializer.INSTANCE)));
+
+		return testSpecifications.get();
 	}
 }
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
index 26281da..2679f6a 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
@@ -25,25 +25,35 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.table.api.dataview.MapView;
 import org.apache.flink.testutils.migration.MigrationVersion;
 
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
 /**
  * Migration test for the {@link MapViewSerializerSnapshot}.
  */
+@RunWith(Parameterized.class)
 public class MapViewSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<MapView<Integer, String>> {
 
-	private static final String DATA = "flink-1.6-map-view-serializer-data";
-	private static final String SNAPSHOT = "flink-1.6-map-view-serializer-snapshot";
-
-	public MapViewSerializerSnapshotMigrationTest() {
-		super(
-			TestSpecification.<MapView<Integer, String>>builder(
-					"1.6-map-view-serializer",
-					MapViewSerializer.class,
-					MapViewSerializerSnapshot.class,
-					MigrationVersion.v1_6)
-				.withNewSerializerProvider(() -> new MapViewSerializer<>(
-					new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE)))
-				.withSnapshotDataLocation(SNAPSHOT)
-				.withTestData(DATA, 10)
-		);
+	private static final String SPEC_NAME = "map-view-serializer";
+
+	public MapViewSerializerSnapshotMigrationTest(TestSpecification<MapView<Integer, String>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			SPEC_NAME,
+			MapViewSerializer.class,
+			MapViewSerializerSnapshot.class,
+			() -> new MapViewSerializer<>(new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE)));
+
+		return testSpecifications.get();
 	}
 }
diff --git a/flink-libraries/flink-table/src/test/resources/flink-1.7-list-view-serializer-data b/flink-libraries/flink-table/src/test/resources/flink-1.7-list-view-serializer-data
new file mode 100644
index 0000000..7b6c68a
Binary files /dev/null and b/flink-libraries/flink-table/src/test/resources/flink-1.7-list-view-serializer-data differ
diff --git a/flink-libraries/flink-table/src/test/resources/flink-1.7-list-view-serializer-snapshot b/flink-libraries/flink-table/src/test/resources/flink-1.7-list-view-serializer-snapshot
new file mode 100644
index 0000000..2bc7b5c
Binary files /dev/null and b/flink-libraries/flink-table/src/test/resources/flink-1.7-list-view-serializer-snapshot differ
diff --git a/flink-libraries/flink-table/src/test/resources/flink-1.7-map-view-serializer-data b/flink-libraries/flink-table/src/test/resources/flink-1.7-map-view-serializer-data
new file mode 100644
index 0000000..4d7334a
Binary files /dev/null and b/flink-libraries/flink-table/src/test/resources/flink-1.7-map-view-serializer-data differ
diff --git a/flink-libraries/flink-table/src/test/resources/flink-1.7-map-view-serializer-snapshot b/flink-libraries/flink-table/src/test/resources/flink-1.7-map-view-serializer-snapshot
new file mode 100644
index 0000000..19003a0
Binary files /dev/null and b/flink-libraries/flink-table/src/test/resources/flink-1.7-map-view-serializer-snapshot differ
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
index ffbfd25..91b9658 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
@@ -22,27 +22,36 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTest
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.testutils.migration.MigrationVersion;
 
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
 import java.util.ArrayList;
+import java.util.Collection;
 
 /**
  * Migration test for the {@link ArrayListSerializerSnapshot}.
  */
+@RunWith(Parameterized.class)
 public class ArrayListSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<ArrayList<String>> {
 
-	private static final String DATA = "flink-1.6-arraylist-serializer-data";
-	private static final String SNAPSHOT = "flink-1.6-arraylist-serializer-snapshot";
-
-	public ArrayListSerializerMigrationTest() {
-		super(
-			TestSpecification.<ArrayList<String>>builder(
-					"1.6-arraylist-serializer",
-					ArrayListSerializer.class,
-					ArrayListSerializerSnapshot.class,
-					MigrationVersion.v1_6)
-				.withNewSerializerProvider(() -> new ArrayListSerializer<>(StringSerializer.INSTANCE))
-				.withSnapshotDataLocation(SNAPSHOT)
-				.withTestData(DATA, 10)
-		);
+	private static final String SPEC_NAME = "arraylist-serializer";
+
+	public ArrayListSerializerMigrationTest(TestSpecification<ArrayList<String>> testSpecification) {
+		super(testSpecification);
 	}
 
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			SPEC_NAME,
+			ArrayListSerializer.class,
+			ArrayListSerializerSnapshot.class,
+			() -> new ArrayListSerializer<>(StringSerializer.INSTANCE));
+
+		return testSpecifications.get();
+	}
 }
diff --git a/flink-runtime/src/test/resources/flink-1.7-arraylist-serializer-data b/flink-runtime/src/test/resources/flink-1.7-arraylist-serializer-data
new file mode 100644
index 0000000..7b6c68a
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.7-arraylist-serializer-data differ
diff --git a/flink-runtime/src/test/resources/flink-1.7-arraylist-serializer-snapshot b/flink-runtime/src/test/resources/flink-1.7-arraylist-serializer-snapshot
new file mode 100644
index 0000000..cf43209
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.7-arraylist-serializer-snapshot differ
diff --git a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
index df836a3..45339ab 100644
--- a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
+++ b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
@@ -21,29 +21,39 @@ package org.apache.flink.api.scala.typeutils;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-
 import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
 import scala.util.Either;
 
 /**
  * Migration test for the {@link ScalaEitherSerializerSnapshot}.
  */
+@RunWith(Parameterized.class)
 public class ScalaEitherSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Either<Integer, String>> {
 
-	private static final String DATA = "flink-1.6-scala-either-serializer-data";
-	private static final String SNAPSHOT = "flink-1.6-scala-either-serializer-snapshot";
-
-	public ScalaEitherSerializerSnapshotMigrationTest() {
-		super(
-			TestSpecification.<Either<Integer, String>>builder(
-					"1.6-scala-either-serializer",
-					EitherSerializer.class,
-					ScalaEitherSerializerSnapshot.class,
-					MigrationVersion.v1_6)
-				.withNewSerializerProvider(() -> new EitherSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
-				.withSnapshotDataLocation(SNAPSHOT)
-				.withTestData(DATA, 10)
-		);
+	private static final String SPEC_NAME = "scala-either-serializer";
+
+	public ScalaEitherSerializerSnapshotMigrationTest(TestSpecification<Either<Integer, String>> testSpecification) {
+		super(testSpecification);
 	}
 
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			SPEC_NAME,
+			EitherSerializer.class,
+			ScalaEitherSerializerSnapshot.class,
+			() -> new EitherSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE));
+
+		return testSpecifications.get();
+	}
 }
diff --git a/flink-scala/src/test/resources/flink-1.7-scala-either-serializer-data b/flink-scala/src/test/resources/flink-1.7-scala-either-serializer-data
new file mode 100644
index 0000000..203067c
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.7-scala-either-serializer-data differ
diff --git a/flink-scala/src/test/resources/flink-1.7-scala-either-serializer-snapshot b/flink-scala/src/test/resources/flink-1.7-scala-either-serializer-snapshot
new file mode 100644
index 0000000..3e12866
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.7-scala-either-serializer-snapshot differ


[flink] 05/06: [FLINK-10778] [tests] Remove serializerSnapshotRestoresCurrentSerializer test in TypeSerializerSnapshotMigrationTestBase

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8a49d73690baa47dccf28a33d01feb817b6a3ed2
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 15 15:30:06 2019 +0100

    [FLINK-10778] [tests] Remove serializerSnapshotRestoresCurrentSerializer test in TypeSerializerSnapshotMigrationTestBase
    
    The serializerSnapshotRestoresCurrentSerializer test asserts that the
    restored serializer instance from the snapshot is of the same type as
    the current serializer. This isn't necessary true, as is not a contract
    of the TypeSerializerSnapshot abstraction.
    
    Therefore, this test is removed.
---
 .../typeutils/TypeSerializerSnapshotMigrationTestBase.java    | 11 +----------
 1 file changed, 1 insertion(+), 10 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
index 6aa80b8..05143a8 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
@@ -71,16 +71,7 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 	}
 
 	@Test
-	public void serializerSnapshotRestoresCurrentSerializer() {
-		TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest();
-
-		TypeSerializer<ElementT> restoredSerializer = snapshot.restoreSerializer();
-
-		assertThat(restoredSerializer, instanceOf(testSpecification.serializerType));
-	}
-
-	@Test
-	public void snapshotIsCompatibleWithTheCurrentSerializer() {
+	public void specifiedNewSerializerHasExpectedCompatibilityResultsWithSnapshot() {
 		TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest();
 
 		TypeSerializerSchemaCompatibility<ElementT> result = snapshot.resolveSchemaCompatibility(testSpecification.createSerializer());


[flink] 02/06: [FLINK-10778] [tests] Move MigrationVersion to flink-core test utils

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fa3432ac9f67732a7caa15951920d7309c6b832e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Jan 11 16:49:58 2019 +0100

    [FLINK-10778] [tests] Move MigrationVersion to flink-core test utils
---
 flink-connectors/flink-connector-filesystem/pom.xml               | 8 ++++++++
 .../connectors/fs/bucketing/BucketingSinkMigrationTest.java       | 2 +-
 flink-connectors/flink-connector-kafka-base/pom.xml               | 8 ++++++++
 .../connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java     | 2 +-
 flink-connectors/flink-connector-kinesis/pom.xml                  | 8 ++++++++
 .../connectors/kinesis/FlinkKinesisConsumerMigrationTest.java     | 2 +-
 .../org/apache/flink/testutils}/migration/MigrationVersion.java   | 2 +-
 flink-fs-tests/pom.xml                                            | 8 ++++++++
 .../flink/hdfstests/ContinuousFileProcessingMigrationTest.java    | 2 +-
 .../test/java/org/apache/flink/cep/operator/CEPMigrationTest.java | 2 +-
 .../runtime/operators/windowing/WindowOperatorMigrationTest.java  | 2 +-
 .../utils/LegacyStatefulJobSavepointMigrationITCase.java          | 2 +-
 .../checkpointing/utils/StatefulJobSavepointMigrationITCase.java  | 2 +-
 .../utils/StatefulJobWBroadcastStateMigrationITCase.java          | 2 +-
 .../test/migration/TypeSerializerSnapshotMigrationITCase.java     | 2 +-
 .../restore/keyed/AbstractKeyedOperatorRestoreTestBase.java       | 2 +-
 .../test/state/operator/restore/keyed/KeyedComplexChainTest.java  | 2 +-
 .../restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java  | 2 +-
 .../flink/test/state/operator/restore/unkeyed/ChainBreakTest.java | 2 +-
 .../state/operator/restore/unkeyed/ChainLengthDecreaseTest.java   | 2 +-
 .../state/operator/restore/unkeyed/ChainLengthIncreaseTest.java   | 2 +-
 .../restore/unkeyed/ChainLengthStatelessDecreaseTest.java         | 2 +-
 .../flink/test/state/operator/restore/unkeyed/ChainOrderTest.java | 2 +-
 .../flink/test/state/operator/restore/unkeyed/ChainUnionTest.java | 2 +-
 .../api/scala/migration/StatefulJobSavepointMigrationITCase.scala | 4 ++--
 .../migration/StatefulJobWBroadcastStateMigrationITCase.scala     | 2 +-
 26 files changed, 55 insertions(+), 23 deletions(-)

diff --git a/flink-connectors/flink-connector-filesystem/pom.xml b/flink-connectors/flink-connector-filesystem/pom.xml
index b494530..4fb5b4e 100644
--- a/flink-connectors/flink-connector-filesystem/pom.xml
+++ b/flink-connectors/flink-connector-filesystem/pom.xml
@@ -91,6 +91,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-tests_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
index 8d865b3..f480d3f 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.OperatingSystem;
 
 import org.apache.hadoop.fs.Path;
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml
index 46583d5..29d5d2e 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -175,6 +175,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index fbb3732..27c10a9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -36,8 +36,8 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.SerializedValue;
 
 import org.junit.Assert;
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 6c45677..b131e0d 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -87,6 +87,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 		<!-- Note:
 			The below dependencies are licenced under the Amazon Software License.
 			Flink includes the "flink-connector-kinesis" only as an optional dependency for that reason.
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index f36b661..7160b15 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -38,7 +38,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
similarity index 96%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java
rename to flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
index 5ce24ed..87f6665 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.util.migration;
+package org.apache.flink.testutils.migration;
 
 /**
  * Enumeration for Flink versions, used in migration integration tests
diff --git a/flink-fs-tests/pom.xml b/flink-fs-tests/pom.xml
index ba67573..a67422a 100644
--- a/flink-fs-tests/pom.xml
+++ b/flink-fs-tests/pom.xml
@@ -86,6 +86,14 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-hdfs</artifactId>
 			<scope>test</scope>
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
index 0a3f75e..b3a45c9 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -40,7 +40,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.OperatingSystem;
 
 import org.apache.commons.io.FileUtils;
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
index 0461bd6..d627bf5 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import org.junit.Ignore;
 import org.junit.Test;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
index d2ef199..09494cb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
@@ -53,7 +53,7 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.Collector;
 
 import org.junit.Ignore;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
index 641689a..9c2ff79 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
@@ -45,7 +45,7 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.Collector;
 
 import org.junit.Ignore;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
index e3bc43b74..dab340a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
@@ -40,7 +40,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.Collector;
 
 import org.junit.Test;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
index 456c34f..0187477 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
index 377cea0..c7070b0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
@@ -36,9 +36,9 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
 import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
index 24531e7..69be968 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
@@ -21,9 +21,9 @@ package org.apache.flink.test.state.operator.restore.keyed;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
index 04cd956..1577a1a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.test.state.operator.restore.keyed;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 /**
  * Test state restoration for a keyed operator restore tests.
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
index 320084f..8e35cfe 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
@@ -21,9 +21,9 @@ package org.apache.flink.test.state.operator.restore.unkeyed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
index b061847..b6f4571 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.test.state.operator.restore.unkeyed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
index 0190ca0..ba7ea36 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.test.state.operator.restore.unkeyed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
index d0648ad..70597ac 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.test.state.operator.restore.unkeyed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
index b0f0945..01e6de4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.test.state.operator.restore.unkeyed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
index cb2252e..1885e83 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.test.state.operator.restore.unkeyed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
index d8134d8..baee181 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.test.state.operator.restore.unkeyed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
index 7151477..a4e6489 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -35,11 +35,11 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
 import org.apache.flink.util.Collector
 import org.apache.flink.api.java.tuple.Tuple2
-import org.apache.flink.runtime.state.{StateBackendLoader, FunctionInitializationContext, FunctionSnapshotContext}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
-import org.apache.flink.streaming.util.migration.MigrationVersion
+import org.apache.flink.testutils.migration.MigrationVersion
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{Ignore, Test}
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
index e790a72..9e5ff10 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
@@ -39,8 +39,8 @@ import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.migration.MigrationVersion
 import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
+import org.apache.flink.testutils.migration.MigrationVersion
 import org.apache.flink.util.Collector
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized


[flink] 03/06: [FLINK-10778] [tests] Make TypeSerializerSnapshotMigrationTestBase aware of snapshot version

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e21c3d472e4cc136f1d709c99d66177234fb4d3e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Jan 14 13:38:36 2019 +0100

    [FLINK-10778] [tests] Make TypeSerializerSnapshotMigrationTestBase aware of snapshot version
    
    Before, the tests in TypeSerializerSnapshotMigrationTestBase always
    assumed that the test serializer snapshotw were written with Flink 1.6.
    
    This commit makes this flexible, so that we can allow subclasses to
    specify different snapshot versions for each TestSpecification.
---
 ...mpositeTypeSerializerSnapshotMigrationTest.java | 13 +++-
 .../TypeSerializerSnapshotMigrationTestBase.java   | 29 +++++++--
 .../BaseTypeSerializerSnapshotMigrationTest.java   | 76 +++++++++++++++-------
 .../base/ListSerializerSnapshotMigrationTest.java  |  7 +-
 .../base/MapSerializerSnapshotMigrationTest.java   |  7 +-
 ...mitiveArraySerializerSnapshotMigrationTest.java | 28 +++++---
 .../testutils/migration/MigrationVersion.java      |  4 ++
 .../typeutils/AvroSerializerMigrationTest.java     | 13 +++-
 ...ockableTypeSerializerSnapshotMigrationTest.java |  4 +-
 .../ListViewSerializerSnapshotMigrationTest.java   |  4 +-
 .../MapViewSerializerSnapshotMigrationTest.java    |  4 +-
 .../state/ArrayListSerializerMigrationTest.java    |  7 +-
 ...ScalaEitherSerializerSnapshotMigrationTest.java |  4 +-
 13 files changed, 150 insertions(+), 50 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
index 62135d7..8a451e1 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
 import org.apache.flink.api.java.typeutils.runtime.JavaEitherSerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.types.Either;
 
 import org.junit.runner.RunWith;
@@ -48,14 +49,22 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer
 
 		// Either<String, Integer>
 
-		final TestSpecification<Either<String, Integer>> either = TestSpecification.<Either<String, Integer>>builder("1.6-either", EitherSerializer.class, JavaEitherSerializerSnapshot.class)
+		final TestSpecification<Either<String, Integer>> either =TestSpecification.<Either<String, Integer>>builder(
+				"1.6-either",
+				EitherSerializer.class,
+				JavaEitherSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE))
 			.withSnapshotDataLocation("flink-1.6-either-type-serializer-snapshot")
 			.withTestData("flink-1.6-either-type-serializer-data", 10);
 
 		// GenericArray<String>
 
-		final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerSnapshot.class)
+		final TestSpecification<String[]> array = TestSpecification.<String[]>builder(
+				"1.6-generic-array",
+				GenericArraySerializer.class,
+				GenericArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
 			.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
 			.withTestData("flink-1.6-array-type-serializer-data", 10);
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
index ea18309..55554be 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -125,13 +126,17 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, newSnapshot, serializer);
 
 		DataInputView in = new DataInputDeserializer(out.wrapAsByteBuffer());
-		return TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(in, Thread.currentThread().getContextClassLoader(), null);
+		return readSnapshot(in);
 	}
 
 	private TypeSerializerSnapshot<ElementT> snapshotUnderTest() {
 		DataInputView input = contentsOf(testSpecification.getSnapshotDataLocation());
 		try {
-			return readPre17SnapshotFormat(input);
+			if (!testSpecification.getTestMigrationVersion().isNewerVersionThan(MigrationVersion.v1_6)) {
+				return readPre17SnapshotFormat(input);
+			} else {
+				return readSnapshot(input);
+			}
 		}
 		catch (IOException e) {
 			throw new RuntimeException("Unable to read " + testSpecification.getSnapshotDataLocation(),  e);
@@ -148,6 +153,11 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		return (TypeSerializerSnapshot<ElementT>) serializers.get(0).f1;
 	}
 
+	private TypeSerializerSnapshot<ElementT> readSnapshot(DataInputView in) throws IOException {
+		return TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+			in, Thread.currentThread().getContextClassLoader(), null);
+	}
+
 	private DataInputView dataUnderTest() {
 		return contentsOf(testSpecification.getTestDataLocation());
 	}
@@ -189,6 +199,7 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		private final Class<? extends TypeSerializer<T>> serializerType;
 		private final Class<? extends TypeSerializerSnapshot<T>> snapshotClass;
 		private final String name;
+		private final MigrationVersion testMigrationVersion;
 		private Supplier<? extends TypeSerializer<T>> serializerProvider;
 		private String snapshotDataLocation;
 		private String testDataLocation;
@@ -198,22 +209,26 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		public static <T> TestSpecification<T> builder(
 			String name,
 			Class<? extends TypeSerializer> serializerClass,
-			Class<? extends TypeSerializerSnapshot> snapshotClass) {
+			Class<? extends TypeSerializerSnapshot> snapshotClass,
+			MigrationVersion testMigrationVersion) {
 
 			return new TestSpecification<>(
 				name,
 				(Class<? extends TypeSerializer<T>>) serializerClass,
-				(Class<? extends TypeSerializerSnapshot<T>>) snapshotClass);
+				(Class<? extends TypeSerializerSnapshot<T>>) snapshotClass,
+				testMigrationVersion);
 		}
 
 		private TestSpecification(
 			String name,
 			Class<? extends TypeSerializer<T>> serializerType,
-			Class<? extends TypeSerializerSnapshot<T>> snapshotClass) {
+			Class<? extends TypeSerializerSnapshot<T>> snapshotClass,
+			MigrationVersion testMigrationVersion) {
 
 			this.name = name;
 			this.serializerType = serializerType;
 			this.snapshotClass = snapshotClass;
+			this.testMigrationVersion = testMigrationVersion;
 		}
 
 		public TestSpecification<T> withSerializerProvider(Supplier<? extends TypeSerializer<T>> serializerProvider) {
@@ -249,6 +264,10 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 			return resourcePath(this.snapshotDataLocation);
 		}
 
+		private MigrationVersion getTestMigrationVersion() {
+			return testMigrationVersion;
+		}
+
 		public Class<? extends TypeSerializerSnapshot<T>> getSnapshotClass() {
 			return snapshotClass;
 		}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
index 46abfc6..bd7ac6c 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.types.BooleanValue;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.CharValue;
@@ -61,7 +62,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<BigDecimal> bigDec = TestSpecification.<BigDecimal>builder(
 				"1.6-big-dec",
 				BigDecSerializer.class,
-				BigDecSerializer.BigDecSerializerSnapshot.class)
+				BigDecSerializer.BigDecSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> BigDecSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-big-dec-serializer-snapshot")
 			.withTestData("flink-1.6-big-dec-serializer-data", 10);
@@ -71,7 +73,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<BigInteger> bigInt = TestSpecification.<BigInteger>builder(
 				"1.6-big-int",
 				BigIntSerializer.class,
-				BigIntSerializer.BigIntSerializerSnapshot.class)
+				BigIntSerializer.BigIntSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> BigIntSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-big-int-serializer-snapshot")
 			.withTestData("flink-1.6-big-int-serializer-data", 10);
@@ -81,7 +84,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Boolean> booleanType = TestSpecification.<Boolean>builder(
 				"1.6-boolean",
 				BooleanSerializer.class,
-				BooleanSerializer.BooleanSerializerSnapshot.class)
+				BooleanSerializer.BooleanSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> BooleanSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-boolean-serializer-snapshot")
 			.withTestData("flink-1.6-boolean-serializer-data", 10);
@@ -91,7 +95,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<BooleanValue> booleanValue = TestSpecification.<BooleanValue>builder(
 				"1.6-boolean-value",
 				BooleanValueSerializer.class,
-				BooleanValueSerializer.BooleanValueSerializerSnapshot.class)
+				BooleanValueSerializer.BooleanValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> BooleanValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-boolean-value-serializer-snapshot")
 			.withTestData("flink-1.6-boolean-value-serializer-data", 10);
@@ -101,7 +106,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Byte> byteType = TestSpecification.<Byte>builder(
 				"1.6-byte",
 				ByteSerializer.class,
-				ByteSerializer.ByteSerializerSnapshot.class)
+				ByteSerializer.ByteSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> ByteSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-byte-serializer-snapshot")
 			.withTestData("flink-1.6-byte-serializer-data", 10);
@@ -111,7 +117,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<ByteValue> byteValue = TestSpecification.<ByteValue>builder(
 				"1.6-byte-value",
 				ByteValueSerializer.class,
-				ByteValueSerializer.ByteValueSerializerSnapshot.class)
+				ByteValueSerializer.ByteValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> ByteValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-byte-value-serializer-snapshot")
 			.withTestData("flink-1.6-byte-value-serializer-data", 10);
@@ -121,7 +128,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Character> charType = TestSpecification.<Character>builder(
 				"1.6-char",
 				CharSerializer.class,
-				CharSerializer.CharSerializerSnapshot.class)
+				CharSerializer.CharSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> CharSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-char-serializer-snapshot")
 			.withTestData("flink-1.6-char-serializer-data", 10);
@@ -131,7 +139,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<CharValue> charValue = TestSpecification.<CharValue>builder(
 				"1.6-char-value",
 				CharValueSerializer.class,
-				CharValueSerializer.CharValueSerializerSnapshot.class)
+				CharValueSerializer.CharValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> CharValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-char-value-serializer-snapshot")
 			.withTestData("flink-1.6-char-value-serializer-data", 10);
@@ -141,7 +150,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Date> javaDate = TestSpecification.<Date>builder(
 				"1.6-date",
 				DateSerializer.class,
-				DateSerializer.DateSerializerSnapshot.class)
+				DateSerializer.DateSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> DateSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-date-serializer-snapshot")
 			.withTestData("flink-1.6-date-serializer-data", 10);
@@ -151,7 +161,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Double> doubleType = TestSpecification.<Double>builder(
 				"1.6-double",
 				DoubleSerializer.class,
-				DoubleSerializer.DoubleSerializerSnapshot.class)
+				DoubleSerializer.DoubleSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> DoubleSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-double-serializer-snapshot")
 			.withTestData("flink-1.6-double-serializer-data", 10);
@@ -161,7 +172,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<DoubleValue> doubleValue = TestSpecification.<DoubleValue>builder(
 				"1.6-double-value",
 				DoubleValueSerializer.class,
-				DoubleValueSerializer.DoubleValueSerializerSnapshot.class)
+				DoubleValueSerializer.DoubleValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> DoubleValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-double-value-serializer-snapshot")
 			.withTestData("flink-1.6-double-value-serializer-data", 10);
@@ -171,7 +183,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Float> floatType = TestSpecification.<Float>builder(
 				"1.6-float",
 				FloatSerializer.class,
-				FloatSerializer.FloatSerializerSnapshot.class)
+				FloatSerializer.FloatSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> FloatSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-float-serializer-snapshot")
 			.withTestData("flink-1.6-float-serializer-data", 10);
@@ -181,7 +194,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<FloatValue> floatValue = TestSpecification.<FloatValue>builder(
 				"1.6-float-value",
 				FloatValueSerializer.class,
-				FloatValueSerializer.FloatValueSerializerSnapshot.class)
+				FloatValueSerializer.FloatValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> FloatValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-float-value-serializer-snapshot")
 			.withTestData("flink-1.6-float-value-serializer-data", 10);
@@ -191,7 +205,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Integer> intType = TestSpecification.<Integer>builder(
 				"1.6-int",
 				IntSerializer.class,
-				IntSerializer.IntSerializerSnapshot.class)
+				IntSerializer.IntSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> IntSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-int-serializer-snapshot")
 			.withTestData("flink-1.6-int-serializer-data", 10);
@@ -201,7 +216,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<IntValue> intValue = TestSpecification.<IntValue>builder(
 				"1.6-int-value",
 				IntValueSerializer.class,
-				IntValueSerializer.IntValueSerializerSnapshot.class)
+				IntValueSerializer.IntValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> IntValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-int-value-serializer-snapshot")
 			.withTestData("flink-1.6-int-value-serializer-data", 10);
@@ -211,7 +227,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Long> longType = TestSpecification.<Long>builder(
 				"1.6-long",
 				LongSerializer.class,
-				LongSerializer.LongSerializerSnapshot.class)
+				LongSerializer.LongSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> LongSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-long-serializer-snapshot")
 			.withTestData("flink-1.6-long-serializer-data", 10);
@@ -221,7 +238,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<LongValue> longValue = TestSpecification.<LongValue>builder(
 				"1.6-long-value",
 				LongValueSerializer.class,
-				LongValueSerializer.LongValueSerializerSnapshot.class)
+				LongValueSerializer.LongValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> LongValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-long-value-serializer-snapshot")
 			.withTestData("flink-1.6-long-value-serializer-data", 10);
@@ -231,7 +249,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<NullValue> nullValue = TestSpecification.<NullValue>builder(
 				"1.6-null-value",
 				NullValueSerializer.class,
-				NullValueSerializer.NullValueSerializerSnapshot.class)
+				NullValueSerializer.NullValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> NullValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-null-value-serializer-snapshot")
 			.withTestData("flink-1.6-null-value-serializer-data", 10);
@@ -241,7 +260,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Short> shortType = TestSpecification.<Short>builder(
 				"1.6-short",
 				ShortSerializer.class,
-				ShortSerializer.ShortSerializerSnapshot.class)
+				ShortSerializer.ShortSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> ShortSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-short-serializer-snapshot")
 			.withTestData("flink-1.6-short-serializer-data", 10);
@@ -251,7 +271,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<ShortValue> shortValue = TestSpecification.<ShortValue>builder(
 				"1.6-short-value",
 				ShortValueSerializer.class,
-				ShortValueSerializer.ShortValueSerializerSnapshot.class)
+				ShortValueSerializer.ShortValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> ShortValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-short-value-serializer-snapshot")
 			.withTestData("flink-1.6-short-value-serializer-data", 10);
@@ -261,7 +282,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<java.sql.Date> sqlDate = TestSpecification.<java.sql.Date>builder(
 				"1.6-sql-date",
 				SqlDateSerializer.class,
-				SqlDateSerializer.SqlDateSerializerSnapshot.class)
+				SqlDateSerializer.SqlDateSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> SqlDateSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-sql-date-serializer-snapshot")
 			.withTestData("flink-1.6-sql-date-serializer-data", 10);
@@ -271,7 +293,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Time> sqlTime = TestSpecification.<Time>builder(
 				"1.6-sql-time",
 				SqlTimeSerializer.class,
-				SqlTimeSerializer.SqlTimeSerializerSnapshot.class)
+				SqlTimeSerializer.SqlTimeSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> SqlTimeSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-sql-time-serializer-snapshot")
 			.withTestData("flink-1.6-sql-time-serializer-data", 10);
@@ -281,7 +304,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<Timestamp> sqlTimestamp = TestSpecification.<Timestamp>builder(
 				"1.6-sql-timestamp",
 				SqlTimestampSerializer.class,
-				SqlTimestampSerializer.SqlTimestampSerializerSnapshot.class)
+				SqlTimestampSerializer.SqlTimestampSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> SqlTimestampSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-sql-timestamp-serializer-snapshot")
 			.withTestData("flink-1.6-sql-timestamp-serializer-data", 10);
@@ -291,7 +315,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<String> stringType = TestSpecification.<String>builder(
 				"1.6-string",
 				StringSerializer.class,
-				StringSerializer.StringSerializerSnapshot.class)
+				StringSerializer.StringSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> StringSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-string-serializer-snapshot")
 			.withTestData("flink-1.6-string-serializer-data", 10);
@@ -301,7 +326,8 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 		final TestSpecification<StringValue> stringValue = TestSpecification.<StringValue>builder(
 				"1.6-string-value",
 				StringValueSerializer.class,
-				StringValueSerializer.StringValueSerializerSnapshot.class)
+				StringValueSerializer.StringValueSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> StringValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-string-value-serializer-snapshot")
 			.withTestData("flink-1.6-string-value-serializer-data", 10);
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
index 02643fe..a355e1a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import java.util.List;
 
@@ -32,7 +33,11 @@ public class ListSerializerSnapshotMigrationTest extends TypeSerializerSnapshotM
 
 	public ListSerializerSnapshotMigrationTest() {
 		super(
-			TestSpecification.<List<String>>builder("1.6-list-serializer", ListSerializer.class, ListSerializerSnapshot.class)
+			TestSpecification.<List<String>>builder(
+					"1.6-list-serializer",
+					ListSerializer.class,
+					ListSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new ListSerializer<>(StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
index ea66128..bb6dc95 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import java.util.Map;
 
@@ -32,7 +33,11 @@ public class MapSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMi
 
 	public MapSerializerSnapshotMigrationTest() {
 		super(
-			TestSpecification.<Map<Integer, String>>builder("1.6-map-serializer", MapSerializer.class, MapSerializerSnapshot.class)
+			TestSpecification.<Map<Integer, String>>builder(
+					"1.6-map-serializer",
+					MapSerializer.class,
+					MapSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
index 41f61fb..46d23a9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -45,7 +46,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<boolean[]> booleanArray = TestSpecification.<boolean[]>builder(
 				"1.6-boolean-primitive-array",
 				BooleanPrimitiveArraySerializer.class,
-				BooleanPrimitiveArraySerializer.BooleanPrimitiveArraySerializerSnapshot.class)
+				BooleanPrimitiveArraySerializer.BooleanPrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> BooleanPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-boolean-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-boolean-primitive-array-serializer-data", 10);
@@ -55,7 +57,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<byte[]> byteArray = TestSpecification.<byte[]>builder(
 				"1.6-byte-primitive-array",
 				BytePrimitiveArraySerializer.class,
-				BytePrimitiveArraySerializer.BytePrimitiveArraySerializerSnapshot.class)
+				BytePrimitiveArraySerializer.BytePrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> BytePrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-byte-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-byte-primitive-array-serializer-data", 10);
@@ -65,7 +68,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<char[]> charArray = TestSpecification.<char[]>builder(
 				"1.6-char-primitive-array",
 				CharPrimitiveArraySerializer.class,
-				CharPrimitiveArraySerializer.CharPrimitiveArraySerializerSnapshot.class)
+				CharPrimitiveArraySerializer.CharPrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> CharPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-char-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-char-primitive-array-serializer-data", 10);
@@ -75,7 +79,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<double[]> doubleArray = TestSpecification.<double[]>builder(
 				"1.6-double-primitive-array",
 				DoublePrimitiveArraySerializer.class,
-				DoublePrimitiveArraySerializer.DoublePrimitiveArraySerializerSnapshot.class)
+				DoublePrimitiveArraySerializer.DoublePrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> DoublePrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-double-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-double-primitive-array-serializer-data", 10);
@@ -85,7 +90,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<float[]> floatArray = TestSpecification.<float[]>builder(
 				"1.6-float-primitive-array",
 				FloatPrimitiveArraySerializer.class,
-				FloatPrimitiveArraySerializer.FloatPrimitiveArraySerializerSnapshot.class)
+				FloatPrimitiveArraySerializer.FloatPrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> FloatPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-float-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-float-primitive-array-serializer-data", 10);
@@ -95,7 +101,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<int[]> intArray = TestSpecification.<int[]>builder(
 				"1.6-int-primitive-array",
 				IntPrimitiveArraySerializer.class,
-				IntPrimitiveArraySerializer.IntPrimitiveArraySerializerSnapshot.class)
+				IntPrimitiveArraySerializer.IntPrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> IntPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-int-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-int-primitive-array-serializer-data", 10);
@@ -105,7 +112,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<long[]> longArray = TestSpecification.<long[]>builder(
 				"1.6-long-primitive-array",
 				LongPrimitiveArraySerializer.class,
-				LongPrimitiveArraySerializer.LongPrimitiveArraySerializerSnapshot.class)
+				LongPrimitiveArraySerializer.LongPrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> LongPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-long-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-long-primitive-array-serializer-data", 10);
@@ -115,7 +123,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<short[]> shortArray = TestSpecification.<short[]>builder(
 				"1.6-short-primitive-array",
 				ShortPrimitiveArraySerializer.class,
-				ShortPrimitiveArraySerializer.ShortPrimitiveArraySerializerSnapshot.class)
+				ShortPrimitiveArraySerializer.ShortPrimitiveArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> ShortPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-short-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-short-primitive-array-serializer-data", 10);
@@ -125,7 +134,8 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 		final TestSpecification<String[]> stringArray = TestSpecification.<String[]>builder(
 				"1.6-string-array",
 				StringArraySerializer.class,
-				StringArraySerializer.StringArraySerializerSnapshot.class)
+				StringArraySerializer.StringArraySerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> StringArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-string-array-serializer-snapshot")
 			.withTestData("flink-1.6-string-array-serializer-data", 10);
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
index 87f6665..b6586d8 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
@@ -43,4 +43,8 @@ public enum MigrationVersion {
 	public String toString() {
 		return versionStr;
 	}
+
+	public boolean isNewerVersionThan(MigrationVersion otherVersion) {
+		return Double.valueOf(versionStr) > Double.valueOf(otherVersion.versionStr);
+	}
 }
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
index a5f2a8e..1cc259b 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTest
 import org.apache.flink.formats.avro.generated.Address;
 
 import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.testutils.migration.MigrationVersion;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -46,12 +47,20 @@ public class AvroSerializerMigrationTest extends TypeSerializerSnapshotMigration
 	@Parameterized.Parameters(name = "Test Specification = {0}")
 	public static Collection<Object[]> testSpecifications() {
 
-		final TestSpecification<Address> genericCase = TestSpecification.<Address>builder("1.6-generic", AvroSerializer.class, AvroSerializerSnapshot.class)
+		final TestSpecification<Address> genericCase = TestSpecification.<Address>builder(
+				"1.6-generic",
+				AvroSerializer.class,
+				AvroSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> new AvroSerializer(GenericRecord.class, Address.getClassSchema()))
 			.withSnapshotDataLocation(GENERIC_SNAPSHOT)
 			.withTestData(DATA, 10);
 
-		final TestSpecification<Address> specificCase = TestSpecification.<Address>builder("1.6-specific", AvroSerializer.class, AvroSerializerSnapshot.class)
+		final TestSpecification<Address> specificCase = TestSpecification.<Address>builder(
+				"1.6-specific",
+				AvroSerializer.class,
+				AvroSerializerSnapshot.class,
+				MigrationVersion.v1_6)
 			.withSerializerProvider(() -> new AvroSerializer<>(Address.class))
 			.withSnapshotDataLocation(SPECIFIC_SNAPSHOT)
 			.withTestData(DATA, 10);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
index bb3b7f2..431d708 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa.sharedbuffer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 /**
  * Migration test for the {@link LockableTypeSerializerSnapshot}.
@@ -34,7 +35,8 @@ public class LockableTypeSerializerSnapshotMigrationTest extends TypeSerializerS
 			TestSpecification.<Lockable<String>>builder(
 					"1.6-lockable-type-serializer",
 					Lockable.LockableTypeSerializer.class,
-					LockableTypeSerializerSnapshot.class)
+					LockableTypeSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new Lockable.LockableTypeSerializer<>(StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
index 5465ada..ab474b38 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTest
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 /**
  * Migration test for the {@link ListViewSerializerSnapshot}.
@@ -36,7 +37,8 @@ public class ListViewSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 			TestSpecification.<ListView<String>>builder(
 					"1.6-list-view-serializer",
 					ListViewSerializer.class,
-					ListViewSerializerSnapshot.class)
+					ListViewSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new ListViewSerializer<>(new ListSerializer<>(StringSerializer.INSTANCE)))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
index 66c7f17..d3106d3 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.table.api.dataview.MapView;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 /**
  * Migration test for the {@link MapViewSerializerSnapshot}.
@@ -37,7 +38,8 @@ public class MapViewSerializerSnapshotMigrationTest extends TypeSerializerSnapsh
 			TestSpecification.<MapView<Integer, String>>builder(
 					"1.6-map-view-serializer",
 					MapViewSerializer.class,
-					MapViewSerializerSnapshot.class)
+					MapViewSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new MapViewSerializer<>(
 					new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE)))
 				.withSnapshotDataLocation(SNAPSHOT)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
index b355795..a66097d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
 
 import java.util.ArrayList;
 
@@ -33,7 +34,11 @@ public class ArrayListSerializerMigrationTest extends TypeSerializerSnapshotMigr
 
 	public ArrayListSerializerMigrationTest() {
 		super(
-			TestSpecification.<ArrayList<String>>builder("1.6-arraylist-serializer", ArrayListSerializer.class, ArrayListSerializerSnapshot.class)
+			TestSpecification.<ArrayList<String>>builder(
+					"1.6-arraylist-serializer",
+					ArrayListSerializer.class,
+					ArrayListSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new ArrayListSerializer<>(StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
diff --git a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
index 9cd8b5d..9a84587 100644
--- a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
+++ b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTest
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 
+import org.apache.flink.testutils.migration.MigrationVersion;
 import scala.util.Either;
 
 /**
@@ -37,7 +38,8 @@ public class ScalaEitherSerializerSnapshotMigrationTest extends TypeSerializerSn
 			TestSpecification.<Either<Integer, String>>builder(
 					"1.6-scala-either-serializer",
 					EitherSerializer.class,
-					ScalaEitherSerializerSnapshot.class)
+					ScalaEitherSerializerSnapshot.class,
+					MigrationVersion.v1_6)
 				.withSerializerProvider(() -> new EitherSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)


[flink] 04/06: [FLINK-10778] [tests] Make new serializer compatibility tests more flexible in TypeSerializerSnapshotMigrationTestBase

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6124db0a3edf13c97e4559469e72e1729d849b1
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 15 15:19:55 2019 +0100

    [FLINK-10778] [tests] Make new serializer compatibility tests more flexible in TypeSerializerSnapshotMigrationTestBase
    
    Before, the TypeSerializerSnapshotMigrationTestBase test base always
    only asserted that a new serializer is compatible as is with the
    previous serializer's snapshot.
    
    This makes the test code not resusable for cases where the new
    serializer provided isn't compatible as is, but other compatibility
    types, for example Kryo serializers that requires reconfiguration on
    restore. This allows the test base to express those cases.
---
 ...mpositeTypeSerializerSnapshotMigrationTest.java |  4 +-
 .../TypeSerializerSnapshotMigrationTestBase.java   | 45 +++++++++++++++++--
 .../BaseTypeSerializerSnapshotMigrationTest.java   | 50 +++++++++++-----------
 .../base/ListSerializerSnapshotMigrationTest.java  |  2 +-
 .../base/MapSerializerSnapshotMigrationTest.java   |  2 +-
 ...mitiveArraySerializerSnapshotMigrationTest.java | 18 ++++----
 .../typeutils/AvroSerializerMigrationTest.java     |  4 +-
 ...ockableTypeSerializerSnapshotMigrationTest.java |  2 +-
 .../ListViewSerializerSnapshotMigrationTest.java   |  2 +-
 .../MapViewSerializerSnapshotMigrationTest.java    |  2 +-
 .../state/ArrayListSerializerMigrationTest.java    |  2 +-
 ...ScalaEitherSerializerSnapshotMigrationTest.java |  2 +-
 12 files changed, 87 insertions(+), 48 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
index 8a451e1..c8833b6 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
@@ -54,7 +54,7 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer
 				EitherSerializer.class,
 				JavaEitherSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE))
+			.withNewSerializerProvider(() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE))
 			.withSnapshotDataLocation("flink-1.6-either-type-serializer-snapshot")
 			.withTestData("flink-1.6-either-type-serializer-data", 10);
 
@@ -65,7 +65,7 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer
 				GenericArraySerializer.class,
 				GenericArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
+			.withNewSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
 			.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
 			.withTestData("flink-1.6-array-type-serializer-data", 10);
 
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
index 55554be..6aa80b8 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
@@ -25,6 +25,10 @@ import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -42,7 +46,6 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertTrue;
 
 /**
  * A test base for verifying {@link TypeSerializerSnapshot} migration.
@@ -82,7 +85,7 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 
 		TypeSerializerSchemaCompatibility<ElementT> result = snapshot.resolveSchemaCompatibility(testSpecification.createSerializer());
 
-		assertTrue(result.isCompatibleAsIs());
+		assertThat(result, hasSameCompatibilityType(testSpecification.expectedCompatibilityResult));
 	}
 
 	@Test
@@ -201,6 +204,7 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		private final String name;
 		private final MigrationVersion testMigrationVersion;
 		private Supplier<? extends TypeSerializer<T>> serializerProvider;
+		private TypeSerializerSchemaCompatibility<T> expectedCompatibilityResult;
 		private String snapshotDataLocation;
 		private String testDataLocation;
 		private int testDataCount;
@@ -231,8 +235,15 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 			this.testMigrationVersion = testMigrationVersion;
 		}
 
-		public TestSpecification<T> withSerializerProvider(Supplier<? extends TypeSerializer<T>> serializerProvider) {
+		public TestSpecification<T> withNewSerializerProvider(Supplier<? extends TypeSerializer<T>> serializerProvider) {
+			return withNewSerializerProvider(serializerProvider, TypeSerializerSchemaCompatibility.compatibleAsIs());
+		}
+
+		public TestSpecification<T> withNewSerializerProvider(
+				Supplier<? extends TypeSerializer<T>> serializerProvider,
+				TypeSerializerSchemaCompatibility<T> expectedCompatibilityResult) {
 			this.serializerProvider = serializerProvider;
+			this.expectedCompatibilityResult = expectedCompatibilityResult;
 			return this;
 		}
 
@@ -277,4 +288,32 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 			return String.format("%s , %s, %s", name, serializerType.getSimpleName(), snapshotClass.getSimpleName());
 		}
 	}
+
+	// --------------------------------------------------------------------------------------------------------------
+	// Utilities
+	// --------------------------------------------------------------------------------------------------------------
+
+	private <T> Matcher<TypeSerializerSchemaCompatibility<T>> hasSameCompatibilityType(TypeSerializerSchemaCompatibility<T> expectedCompatibilty) {
+		return new TypeSafeMatcher<TypeSerializerSchemaCompatibility<T>>() {
+
+			@Override
+			protected boolean matchesSafely(TypeSerializerSchemaCompatibility<T> testResultCompatibility) {
+				if (expectedCompatibilty.isCompatibleAsIs()) {
+					return testResultCompatibility.isCompatibleAsIs();
+				} else if (expectedCompatibilty.isIncompatible()) {
+					return testResultCompatibility.isCompatibleAfterMigration();
+				} else if (expectedCompatibilty.isIncompatible()) {
+					return testResultCompatibility.isIncompatible();
+				} else if (expectedCompatibilty.isCompatibleWithReconfiguredSerializer()) {
+					return testResultCompatibility.isCompatibleWithReconfiguredSerializer();
+				}
+				return false;
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("same compatibility as ").appendValue(expectedCompatibilty);
+			}
+		};
+	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
index bd7ac6c..93725c4 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BaseTypeSerializerSnapshotMigrationTest.java
@@ -64,7 +64,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				BigDecSerializer.class,
 				BigDecSerializer.BigDecSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> BigDecSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> BigDecSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-big-dec-serializer-snapshot")
 			.withTestData("flink-1.6-big-dec-serializer-data", 10);
 
@@ -75,7 +75,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				BigIntSerializer.class,
 				BigIntSerializer.BigIntSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> BigIntSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> BigIntSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-big-int-serializer-snapshot")
 			.withTestData("flink-1.6-big-int-serializer-data", 10);
 
@@ -86,7 +86,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				BooleanSerializer.class,
 				BooleanSerializer.BooleanSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> BooleanSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> BooleanSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-boolean-serializer-snapshot")
 			.withTestData("flink-1.6-boolean-serializer-data", 10);
 
@@ -97,7 +97,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				BooleanValueSerializer.class,
 				BooleanValueSerializer.BooleanValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> BooleanValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> BooleanValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-boolean-value-serializer-snapshot")
 			.withTestData("flink-1.6-boolean-value-serializer-data", 10);
 
@@ -108,7 +108,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				ByteSerializer.class,
 				ByteSerializer.ByteSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> ByteSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> ByteSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-byte-serializer-snapshot")
 			.withTestData("flink-1.6-byte-serializer-data", 10);
 
@@ -119,7 +119,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				ByteValueSerializer.class,
 				ByteValueSerializer.ByteValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> ByteValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> ByteValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-byte-value-serializer-snapshot")
 			.withTestData("flink-1.6-byte-value-serializer-data", 10);
 
@@ -130,7 +130,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				CharSerializer.class,
 				CharSerializer.CharSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> CharSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> CharSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-char-serializer-snapshot")
 			.withTestData("flink-1.6-char-serializer-data", 10);
 
@@ -141,7 +141,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				CharValueSerializer.class,
 				CharValueSerializer.CharValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> CharValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> CharValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-char-value-serializer-snapshot")
 			.withTestData("flink-1.6-char-value-serializer-data", 10);
 
@@ -152,7 +152,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				DateSerializer.class,
 				DateSerializer.DateSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> DateSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> DateSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-date-serializer-snapshot")
 			.withTestData("flink-1.6-date-serializer-data", 10);
 
@@ -163,7 +163,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				DoubleSerializer.class,
 				DoubleSerializer.DoubleSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> DoubleSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> DoubleSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-double-serializer-snapshot")
 			.withTestData("flink-1.6-double-serializer-data", 10);
 
@@ -174,7 +174,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				DoubleValueSerializer.class,
 				DoubleValueSerializer.DoubleValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> DoubleValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> DoubleValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-double-value-serializer-snapshot")
 			.withTestData("flink-1.6-double-value-serializer-data", 10);
 
@@ -185,7 +185,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				FloatSerializer.class,
 				FloatSerializer.FloatSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> FloatSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> FloatSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-float-serializer-snapshot")
 			.withTestData("flink-1.6-float-serializer-data", 10);
 
@@ -196,7 +196,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				FloatValueSerializer.class,
 				FloatValueSerializer.FloatValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> FloatValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> FloatValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-float-value-serializer-snapshot")
 			.withTestData("flink-1.6-float-value-serializer-data", 10);
 
@@ -207,7 +207,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				IntSerializer.class,
 				IntSerializer.IntSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> IntSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> IntSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-int-serializer-snapshot")
 			.withTestData("flink-1.6-int-serializer-data", 10);
 
@@ -218,7 +218,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				IntValueSerializer.class,
 				IntValueSerializer.IntValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> IntValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> IntValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-int-value-serializer-snapshot")
 			.withTestData("flink-1.6-int-value-serializer-data", 10);
 
@@ -229,7 +229,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				LongSerializer.class,
 				LongSerializer.LongSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> LongSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> LongSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-long-serializer-snapshot")
 			.withTestData("flink-1.6-long-serializer-data", 10);
 
@@ -240,7 +240,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				LongValueSerializer.class,
 				LongValueSerializer.LongValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> LongValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> LongValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-long-value-serializer-snapshot")
 			.withTestData("flink-1.6-long-value-serializer-data", 10);
 
@@ -251,7 +251,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				NullValueSerializer.class,
 				NullValueSerializer.NullValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> NullValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> NullValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-null-value-serializer-snapshot")
 			.withTestData("flink-1.6-null-value-serializer-data", 10);
 
@@ -262,7 +262,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				ShortSerializer.class,
 				ShortSerializer.ShortSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> ShortSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> ShortSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-short-serializer-snapshot")
 			.withTestData("flink-1.6-short-serializer-data", 10);
 
@@ -273,7 +273,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				ShortValueSerializer.class,
 				ShortValueSerializer.ShortValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> ShortValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> ShortValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-short-value-serializer-snapshot")
 			.withTestData("flink-1.6-short-value-serializer-data", 10);
 
@@ -284,7 +284,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				SqlDateSerializer.class,
 				SqlDateSerializer.SqlDateSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> SqlDateSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> SqlDateSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-sql-date-serializer-snapshot")
 			.withTestData("flink-1.6-sql-date-serializer-data", 10);
 
@@ -295,7 +295,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				SqlTimeSerializer.class,
 				SqlTimeSerializer.SqlTimeSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> SqlTimeSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> SqlTimeSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-sql-time-serializer-snapshot")
 			.withTestData("flink-1.6-sql-time-serializer-data", 10);
 
@@ -306,7 +306,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				SqlTimestampSerializer.class,
 				SqlTimestampSerializer.SqlTimestampSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> SqlTimestampSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> SqlTimestampSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-sql-timestamp-serializer-snapshot")
 			.withTestData("flink-1.6-sql-timestamp-serializer-data", 10);
 
@@ -317,7 +317,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				StringSerializer.class,
 				StringSerializer.StringSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> StringSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> StringSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-string-serializer-snapshot")
 			.withTestData("flink-1.6-string-serializer-data", 10);
 
@@ -328,7 +328,7 @@ public class BaseTypeSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 				StringValueSerializer.class,
 				StringValueSerializer.StringValueSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> StringValueSerializer.INSTANCE)
+			.withNewSerializerProvider(() -> StringValueSerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-string-value-serializer-snapshot")
 			.withTestData("flink-1.6-string-value-serializer-data", 10);
 
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
index a355e1a..524a801 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshotMigrationTest.java
@@ -38,7 +38,7 @@ public class ListSerializerSnapshotMigrationTest extends TypeSerializerSnapshotM
 					ListSerializer.class,
 					ListSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new ListSerializer<>(StringSerializer.INSTANCE))
+				.withNewSerializerProvider(() -> new ListSerializer<>(StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
 		);
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
index bb6dc95..be9f152 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshotMigrationTest.java
@@ -38,7 +38,7 @@ public class MapSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMi
 					MapSerializer.class,
 					MapSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
+				.withNewSerializerProvider(() -> new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
 		);
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
index 46d23a9..f6aa59f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArraySerializerSnapshotMigrationTest.java
@@ -48,7 +48,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				BooleanPrimitiveArraySerializer.class,
 				BooleanPrimitiveArraySerializer.BooleanPrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> BooleanPrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> BooleanPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-boolean-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-boolean-primitive-array-serializer-data", 10);
 
@@ -59,7 +59,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				BytePrimitiveArraySerializer.class,
 				BytePrimitiveArraySerializer.BytePrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> BytePrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> BytePrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-byte-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-byte-primitive-array-serializer-data", 10);
 
@@ -70,7 +70,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				CharPrimitiveArraySerializer.class,
 				CharPrimitiveArraySerializer.CharPrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> CharPrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> CharPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-char-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-char-primitive-array-serializer-data", 10);
 
@@ -81,7 +81,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				DoublePrimitiveArraySerializer.class,
 				DoublePrimitiveArraySerializer.DoublePrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> DoublePrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> DoublePrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-double-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-double-primitive-array-serializer-data", 10);
 
@@ -92,7 +92,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				FloatPrimitiveArraySerializer.class,
 				FloatPrimitiveArraySerializer.FloatPrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> FloatPrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> FloatPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-float-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-float-primitive-array-serializer-data", 10);
 
@@ -103,7 +103,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				IntPrimitiveArraySerializer.class,
 				IntPrimitiveArraySerializer.IntPrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> IntPrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> IntPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-int-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-int-primitive-array-serializer-data", 10);
 
@@ -114,7 +114,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				LongPrimitiveArraySerializer.class,
 				LongPrimitiveArraySerializer.LongPrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> LongPrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> LongPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-long-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-long-primitive-array-serializer-data", 10);
 
@@ -125,7 +125,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				ShortPrimitiveArraySerializer.class,
 				ShortPrimitiveArraySerializer.ShortPrimitiveArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> ShortPrimitiveArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> ShortPrimitiveArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-short-primitive-array-serializer-snapshot")
 			.withTestData("flink-1.6-short-primitive-array-serializer-data", 10);
 
@@ -136,7 +136,7 @@ public class PrimitiveArraySerializerSnapshotMigrationTest extends TypeSerialize
 				StringArraySerializer.class,
 				StringArraySerializer.StringArraySerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> StringArraySerializer.INSTANCE)
+			.withNewSerializerProvider(() -> StringArraySerializer.INSTANCE)
 			.withSnapshotDataLocation("flink-1.6-string-array-serializer-snapshot")
 			.withTestData("flink-1.6-string-array-serializer-data", 10);
 
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
index 1cc259b..019183e 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
@@ -52,7 +52,7 @@ public class AvroSerializerMigrationTest extends TypeSerializerSnapshotMigration
 				AvroSerializer.class,
 				AvroSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> new AvroSerializer(GenericRecord.class, Address.getClassSchema()))
+			.withNewSerializerProvider(() -> new AvroSerializer(GenericRecord.class, Address.getClassSchema()))
 			.withSnapshotDataLocation(GENERIC_SNAPSHOT)
 			.withTestData(DATA, 10);
 
@@ -61,7 +61,7 @@ public class AvroSerializerMigrationTest extends TypeSerializerSnapshotMigration
 				AvroSerializer.class,
 				AvroSerializerSnapshot.class,
 				MigrationVersion.v1_6)
-			.withSerializerProvider(() -> new AvroSerializer<>(Address.class))
+			.withNewSerializerProvider(() -> new AvroSerializer<>(Address.class))
 			.withSnapshotDataLocation(SPECIFIC_SNAPSHOT)
 			.withTestData(DATA, 10);
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
index 431d708..cb911d6 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
@@ -37,7 +37,7 @@ public class LockableTypeSerializerSnapshotMigrationTest extends TypeSerializerS
 					Lockable.LockableTypeSerializer.class,
 					LockableTypeSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new Lockable.LockableTypeSerializer<>(StringSerializer.INSTANCE))
+				.withNewSerializerProvider(() -> new Lockable.LockableTypeSerializer<>(StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
 		);
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
index ab474b38..69cbe08 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
@@ -39,7 +39,7 @@ public class ListViewSerializerSnapshotMigrationTest extends TypeSerializerSnaps
 					ListViewSerializer.class,
 					ListViewSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new ListViewSerializer<>(new ListSerializer<>(StringSerializer.INSTANCE)))
+				.withNewSerializerProvider(() -> new ListViewSerializer<>(new ListSerializer<>(StringSerializer.INSTANCE)))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
 		);
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
index d3106d3..26281da 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
@@ -40,7 +40,7 @@ public class MapViewSerializerSnapshotMigrationTest extends TypeSerializerSnapsh
 					MapViewSerializer.class,
 					MapViewSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new MapViewSerializer<>(
+				.withNewSerializerProvider(() -> new MapViewSerializer<>(
 					new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE)))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
index a66097d..ffbfd25 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ArrayListSerializerMigrationTest.java
@@ -39,7 +39,7 @@ public class ArrayListSerializerMigrationTest extends TypeSerializerSnapshotMigr
 					ArrayListSerializer.class,
 					ArrayListSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new ArrayListSerializer<>(StringSerializer.INSTANCE))
+				.withNewSerializerProvider(() -> new ArrayListSerializer<>(StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
 		);
diff --git a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
index 9a84587..df836a3 100644
--- a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
+++ b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
@@ -40,7 +40,7 @@ public class ScalaEitherSerializerSnapshotMigrationTest extends TypeSerializerSn
 					EitherSerializer.class,
 					ScalaEitherSerializerSnapshot.class,
 					MigrationVersion.v1_6)
-				.withSerializerProvider(() -> new EitherSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
+				.withNewSerializerProvider(() -> new EitherSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
 				.withSnapshotDataLocation(SNAPSHOT)
 				.withTestData(DATA, 10)
 		);


[flink] 01/06: [hotfix] [tests] Remove unfruitful MigrationTestUtil class

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b30adb0ce90d8f81f8028aeb5d541a3a67e6543d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Jan 10 14:58:43 2019 +0100

    [hotfix] [tests] Remove unfruitful MigrationTestUtil class
    
    That utility class had a single helper method, restoreFromSnapshot,
    which accepts the target snapshot's Flink version. This was useful
    before, because the way to restore snapshots was a bit different for
    Flink <= 1.1 and newer versions.
    
    Since we now no longer support compatibility for 1.1 versions and
    below, this helper method is simply forwarding the restore operation
    to the test harness.
    
    This commit refactors this have equivalent behaviour directly in the
    AbstractStreamOperatorTestHarness class.
---
 .../fs/bucketing/BucketingSinkMigrationTest.java   |  7 +---
 .../kafka/FlinkKafkaConsumerBaseMigrationTest.java | 25 ++++--------
 .../kafka/FlinkKafkaConsumerBaseTest.java          |  2 +-
 .../kinesis/FlinkKinesisConsumerMigrationTest.java | 19 +++++-----
 .../kinesis/FlinkKinesisConsumerTest.java          |  2 +-
 .../ContinuousFileProcessingMigrationTest.java     | 13 ++-----
 .../flink/cep/operator/CEPMigrationTest.java       | 28 ++++++--------
 .../windowing/WindowOperatorMigrationTest.java     | 43 +++++++--------------
 .../util/AbstractStreamOperatorTestHarness.java    | 10 ++++-
 .../util/migration/MigrationTestUtil.java          | 44 ----------------------
 10 files changed, 59 insertions(+), 134 deletions(-)

diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
index 44be702..8d865b3 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationTestUtil;
 import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.util.OperatingSystem;
 
@@ -167,11 +166,9 @@ public class BucketingSinkMigrationTest {
 			new StreamSink<>(sink), 10, 1, 0);
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"bucketing-sink-migration-test-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"bucketing-sink-migration-test-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index be468a0..fbb3732 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -36,7 +36,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationTestUtil;
 import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
@@ -207,11 +206,9 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		testHarness.setup();
 
 		// restore state from binary snapshot file
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"),
-			testMigrateVersion);
+				"kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"));
 
 		testHarness.open();
 
@@ -248,11 +245,9 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		testHarness.setup();
 
 		// restore state from binary snapshot file
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"),
-			testMigrateVersion);
+				"kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"));
 
 		testHarness.open();
 
@@ -302,11 +297,9 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		testHarness.setup();
 
 		// restore state from binary snapshot file
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -349,11 +342,9 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 
 		// restore state from binary snapshot file; should fail since discovery is enabled
 		try {
-			MigrationTestUtil.restoreFromSnapshot(
-				testHarness,
+			testHarness.initializeState(
 				OperatorSnapshotUtil.getResourceFilename(
-					"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"),
-				testMigrateVersion);
+					"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
 
 			fail("Restore from savepoints from version before Flink 1.3.x should have failed if discovery is enabled.");
 		} catch (Exception e) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index dbe7630..40bb580 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -515,7 +515,7 @@ public class FlinkKafkaConsumerBaseTest {
 			testHarnesses[i] = createTestHarness(consumers[i], initialParallelism, i);
 
 			// initializeState() is always called, null signals that we didn't restore
-			testHarnesses[i].initializeState(null);
+			testHarnesses[i].initializeEmptyState();
 			testHarnesses[i].open();
 		}
 
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index e21a880..f36b661 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -38,7 +38,6 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationTestUtil;
 import org.apache.flink.streaming.util.migration.MigrationVersion;
 
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -148,9 +147,9 @@ public class FlinkKinesisConsumerMigrationTest {
 			new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
 
 		testHarness.setup();
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
-			"src/test/resources/kinesis-consumer-migration-test-flink" + testMigrateVersion + "-empty-snapshot", testMigrateVersion);
+		testHarness.initializeState(
+			OperatorSnapshotUtil.getResourceFilename(
+				"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-empty-snapshot"));
 		testHarness.open();
 
 		consumerFunction.run(new TestSourceContext<>());
@@ -204,9 +203,9 @@ public class FlinkKinesisConsumerMigrationTest {
 			new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
 
 		testHarness.setup();
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
-			"src/test/resources/kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot", testMigrateVersion);
+		testHarness.initializeState(
+			OperatorSnapshotUtil.getResourceFilename(
+				"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
 		testHarness.open();
 
 		consumerFunction.run(new TestSourceContext<>());
@@ -285,9 +284,9 @@ public class FlinkKinesisConsumerMigrationTest {
 			new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
 
 		testHarness.setup();
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
-			"src/test/resources/kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot", testMigrateVersion);
+		testHarness.initializeState(
+			OperatorSnapshotUtil.getResourceFilename(
+				"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
 		testHarness.open();
 
 		consumerFunction.run(new TestSourceContext<>());
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 84e18bd..d36d68a 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -763,7 +763,7 @@ public class FlinkKinesisConsumerTest {
 		testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
 		testHarness.getExecutionConfig().setAutoWatermarkInterval(autoWatermarkInterval);
 
-		testHarness.initializeState(null);
+		testHarness.initializeEmptyState();
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Watermark> watermarks = new ConcurrentLinkedQueue<>();
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
index 7f7e0c5..0a3f75e 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationTestUtil;
 import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.util.OperatingSystem;
 
@@ -176,11 +175,9 @@ public class ContinuousFileProcessingMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"reader-migration-test-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"reader-migration-test-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -307,11 +304,9 @@ public class ContinuousFileProcessingMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"monitoring-function-migration-test-" + expectedModTime + "-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"monitoring-function-migration-test-" + expectedModTime + "-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
index 061a3c6..0461bd6 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationTestUtil;
 import org.apache.flink.streaming.util.migration.MigrationVersion;
 
 import org.junit.Ignore;
@@ -158,10 +157,8 @@ public class CEPMigrationTest {
 		try {
 			harness.setup();
 
-			MigrationTestUtil.restoreFromSnapshot(
-				harness,
-				OperatorSnapshotUtil.getResourceFilename("cep-migration-after-branching-flink" + migrateVersion + "-snapshot"),
-				migrateVersion);
+			harness.initializeState(
+				OperatorSnapshotUtil.getResourceFilename("cep-migration-after-branching-flink" + migrateVersion + "-snapshot"));
 
 			harness.open();
 
@@ -320,10 +317,9 @@ public class CEPMigrationTest {
 		try {
 			harness.setup();
 
-			MigrationTestUtil.restoreFromSnapshot(
-				harness,
-				OperatorSnapshotUtil.getResourceFilename("cep-migration-starting-new-pattern-flink" + migrateVersion + "-snapshot"),
-				migrateVersion);
+			harness.initializeState(
+				OperatorSnapshotUtil.getResourceFilename(
+					"cep-migration-starting-new-pattern-flink" + migrateVersion + "-snapshot"));
 
 			harness.open();
 
@@ -486,10 +482,9 @@ public class CEPMigrationTest {
 		try {
 			harness.setup();
 
-			MigrationTestUtil.restoreFromSnapshot(
-				harness,
-				OperatorSnapshotUtil.getResourceFilename("cep-migration-single-pattern-afterwards-flink" + migrateVersion + "-snapshot"),
-				migrateVersion);
+			harness.initializeState(
+				OperatorSnapshotUtil.getResourceFilename(
+					"cep-migration-single-pattern-afterwards-flink" + migrateVersion + "-snapshot"));
 
 			harness.open();
 
@@ -579,10 +574,9 @@ public class CEPMigrationTest {
 		try {
 			harness.setup();
 
-			MigrationTestUtil.restoreFromSnapshot(
-				harness,
-				OperatorSnapshotUtil.getResourceFilename("cep-migration-conditions-flink" + migrateVersion + "-snapshot"),
-				migrateVersion);
+			harness.initializeState(
+				OperatorSnapshotUtil.getResourceFilename(
+					"cep-migration-conditions-flink" + migrateVersion + "-snapshot"));
 
 			harness.open();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
index f4883c9..d2ef199 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
@@ -53,7 +53,6 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.streaming.util.migration.MigrationTestUtil;
 import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.util.Collector;
 
@@ -184,11 +183,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-session-with-stateful-trigger-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-session-with-stateful-trigger-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -278,11 +275,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-session-with-stateful-trigger-mint-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-session-with-stateful-trigger-mint-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -402,11 +397,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-reduce-event-time-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-reduce-event-time-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -513,11 +506,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-apply-event-time-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-apply-event-time-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -620,11 +611,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-reduce-processing-time-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-reduce-processing-time-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -719,11 +708,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-apply-processing-time-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-apply-processing-time-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -837,11 +824,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-kryo-serialized-key-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-kryo-serialized-key-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 0e120eb..3af630a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -340,6 +340,14 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 		initializeState(operatorStateHandles, null);
 	}
 
+	public void initializeState(String operatorStateSnapshotPath) throws Exception {
+		initializeState(OperatorSnapshotUtil.readStateHandle(operatorStateSnapshotPath));
+	}
+
+	public void initializeEmptyState() throws Exception {
+		initializeState((OperatorSubtaskState) null);
+	}
+
 	/**
 	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState()}.
 	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)}
@@ -485,7 +493,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 	 */
 	public void open() throws Exception {
 		if (!initializeCalled) {
-			initializeState(null);
+			initializeEmptyState();
 		}
 		operator.open();
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java
deleted file mode 100644
index 1c95a04..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.migration;
-
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-
-/**
- * Utility methods for testing snapshot migrations.
- */
-public class MigrationTestUtil {
-
-	/**
-	 * Restore from a snapshot taken with an older Flink version.
-	 *
-	 * @param testHarness          the test harness to restore the snapshot to.
-	 * @param snapshotPath         the absolute path to the snapshot.
-	 * @param snapshotFlinkVersion the Flink version of the snapshot.
-	 * @throws Exception
-	 */
-	public static void restoreFromSnapshot(
-		AbstractStreamOperatorTestHarness<?> testHarness,
-		String snapshotPath,
-		MigrationVersion snapshotFlinkVersion) throws Exception {
-
-		testHarness.initializeState(OperatorSnapshotUtil.readStateHandle(snapshotPath));
-	}
-}