You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/22 08:27:30 UTC

[flink] 01/03: [FLINK-26146] Add tests of flink version upgrades to cover native snapshots

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0b9f8956a1452a8b1259257af926eac7ad9e3bec
Author: Sebastian Mattheis <se...@ververica.com>
AuthorDate: Fri Feb 18 10:17:49 2022 +0100

    [FLINK-26146] Add tests of flink version upgrades to cover native snapshots
    
    Turned SavepointMigrationTestBase into SnapshotMigrationTestBase that supports testing canconical
    savepoints, native savepoints, and checkpoints; and adapted sub classes accordingly.
---
 .../LegacyStatefulJobSavepointMigrationITCase.java |   8 +-
 ...ava => StatefulJobSnapshotMigrationITCase.java} | 145 ++++++++-------
 .../StatefulJobWBroadcastStateMigrationITCase.java | 139 +++++++-------
 .../checkpointing/utils/MigrationTestUtils.java    |   4 +-
 ...estBase.java => SnapshotMigrationTestBase.java} | 200 +++++++++++++++++++--
 .../TypeSerializerSnapshotMigrationITCase.java     | 155 ++++++++--------
 pom.xml                                            |   4 +-
 7 files changed, 430 insertions(+), 225 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LegacyStatefulJobSavepointMigrationITCase.java
similarity index 99%
rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/LegacyStatefulJobSavepointMigrationITCase.java
index e59be25..fe16a25 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LegacyStatefulJobSavepointMigrationITCase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.test.checkpointing.utils;
+package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.accumulators.IntCounter;
@@ -45,6 +45,7 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
 import org.apache.flink.util.Collector;
 
 import org.junit.Ignore;
@@ -62,7 +63,7 @@ import static org.junit.Assert.assertEquals;
  * previous Flink versions, as well as for different state backends.
  */
 @RunWith(Parameterized.class)
-public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigrationTestBase {
+public class LegacyStatefulJobSavepointMigrationITCase extends SnapshotMigrationTestBase {
 
     private static final int NUM_SOURCE_ELEMENTS = 4;
 
@@ -143,11 +144,12 @@ public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigratio
                 .uid("TimelyStatefulOperator")
                 .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
 
-        executeAndSavepoint(
+        executeAndSnapshot(
                 env,
                 "src/test/resources/"
                         + getSavepointPath(
                                 flinkGenerateSavepointVersion, flinkGenerateSavepointBackendType),
+                SnapshotType.SAVEPOINT_CANONICAL,
                 new Tuple2<>(
                         AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
     }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java
similarity index 76%
rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java
index 260c1f2..df2a20b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.test.checkpointing.utils;
+package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.accumulators.IntCounter;
@@ -28,8 +28,9 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -40,14 +41,17 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
 import org.apache.flink.util.Collector;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.LinkedList;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 
@@ -56,59 +60,69 @@ import static org.junit.Assert.assertEquals;
  * previous Flink versions, as well as for different state backends.
  */
 @RunWith(Parameterized.class)
-public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestBase {
+public class StatefulJobSnapshotMigrationITCase extends SnapshotMigrationTestBase {
 
     private static final int NUM_SOURCE_ELEMENTS = 4;
 
-    /**
-     * This test runs in either of two modes: 1) we want to generate the binary savepoint, i.e. we
-     * have to run the checkpointing functions 2) we want to verify restoring, so we have to run the
-     * checking functions.
-     */
-    public enum ExecutionMode {
-        PERFORM_SAVEPOINT,
-        VERIFY_SAVEPOINT
-    }
+    // TODO increase this to newer version to create and test snapshot migration for newer versions
+    private static final FlinkVersion currentVersion = FlinkVersion.v1_14;
 
-    // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
-    // TODO Note: You should generate the savepoint based on the release branch instead of the
+    // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+    // TODO Note: You should generate the snapshot based on the release branch instead of the
     // master.
-    private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT;
-
-    @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
-    public static Collection<Tuple2<FlinkVersion, String>> parameters() {
-        return Arrays.asList(
-                Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+    private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT;
+
+    @Parameterized.Parameters(name = "Test snapshot: {0}")
+    public static Collection<SnapshotSpec> parameters() {
+        Collection<SnapshotSpec> parameters = new LinkedList<>();
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_4, FlinkVersion.v1_14)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_4, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_NATIVE,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_NATIVE,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.CHECKPOINT,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.CHECKPOINT,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+            parameters =
+                    parameters.stream()
+                            .filter(x -> x.getFlinkVersion().equals(currentVersion))
+                            .collect(Collectors.toList());
+        }
+        return parameters;
     }
 
-    private final FlinkVersion testMigrateVersion;
-    private final String testStateBackend;
+    private final SnapshotSpec snapshotSpec;
 
-    public StatefulJobSavepointMigrationITCase(
-            Tuple2<FlinkVersion, String> testMigrateVersionAndBackend) throws Exception {
-        this.testMigrateVersion = testMigrateVersionAndBackend.f0;
-        this.testStateBackend = testMigrateVersionAndBackend.f1;
+    public StatefulJobSnapshotMigrationITCase(SnapshotSpec snapshotSpec) throws Exception {
+        this.snapshotSpec = snapshotSpec;
     }
 
     @Test
@@ -119,13 +133,16 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRestartStrategy(RestartStrategies.noRestart());
 
-        switch (testStateBackend) {
+        switch (snapshotSpec.getStateBackendType()) {
             case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
-                env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+                env.setStateBackend(new EmbeddedRocksDBStateBackend());
                 break;
             case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
                 env.setStateBackend(new MemoryStateBackend());
                 break;
+            case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME:
+                env.setStateBackend(new HashMapStateBackend());
+                break;
             default:
                 throw new UnsupportedOperationException();
         }
@@ -140,7 +157,7 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
         RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
         OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
 
-        if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) {
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
             nonParallelSource =
                     new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(
                             NUM_SOURCE_ELEMENTS);
@@ -149,7 +166,7 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
                             NUM_SOURCE_ELEMENTS);
             flatMap = new CheckpointingKeyedStateFlatMap();
             timelyOperator = new CheckpointingTimelyStatefulOperator();
-        } else if (executionMode == ExecutionMode.VERIFY_SAVEPOINT) {
+        } else if (executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
             nonParallelSource =
                     new MigrationTestUtils.CheckingNonParallelSourceWithListState(
                             NUM_SOURCE_ELEMENTS);
@@ -190,17 +207,20 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
                 .uid("CheckpointingTimelyStatefulOperator2")
                 .addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
 
-        if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) {
-            executeAndSavepoint(
+        final String snapshotPath = getSnapshotPath(snapshotSpec);
+
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+            executeAndSnapshot(
                     env,
-                    "src/test/resources/" + getSavepointPath(testMigrateVersion, testStateBackend),
+                    "src/test/resources/" + snapshotPath,
+                    snapshotSpec.getSnapshotType(),
                     new Tuple2<>(
                             MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
                             NUM_SOURCE_ELEMENTS * 2));
         } else {
             restoreAndExecute(
                     env,
-                    getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)),
+                    getResourceFilename(snapshotPath),
                     new Tuple2<>(
                             MigrationTestUtils.CheckingNonParallelSourceWithListState
                                     .SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
@@ -228,17 +248,8 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
         }
     }
 
-    private String getSavepointPath(FlinkVersion savepointVersion, String backendType) {
-        switch (backendType) {
-            case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
-                return "new-stateful-udf-migration-itcase-flink"
-                        + savepointVersion
-                        + "-rocksdb-savepoint";
-            case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
-                return "new-stateful-udf-migration-itcase-flink" + savepointVersion + "-savepoint";
-            default:
-                throw new UnsupportedOperationException();
-        }
+    private static String getSnapshotPath(SnapshotSpec snapshotSpec) {
+        return "new-stateful-udf-migration-itcase-" + snapshotSpec;
     }
 
     private static class CheckpointingKeyedStateFlatMap
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java
similarity index 80%
rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java
index 4470b85..459ce1b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.test.checkpointing.utils;
+package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -25,14 +25,17 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.datastream.BroadcastStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
@@ -40,10 +43,11 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to
@@ -51,48 +55,69 @@ import java.util.Map;
  * state backends.
  */
 @RunWith(Parameterized.class)
-public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
+public class StatefulJobWBroadcastStateMigrationITCase extends SnapshotMigrationTestBase {
 
     private static final int NUM_SOURCE_ELEMENTS = 4;
 
-    // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
-    // TODO Note: You should generate the savepoint based on the release branch instead of the
+    // TODO increase this to newer version to create and test snapshot migration for newer versions
+    private static final FlinkVersion currentVersion = FlinkVersion.v1_14;
+
+    // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+    // TODO Note: You should generate the snapshot based on the release branch instead of the
     // master.
-    private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
-            StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
-
-    @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
-    public static Collection<Tuple2<FlinkVersion, String>> parameters() {
-        return Arrays.asList(
-                Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+    private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT;
+
+    @Parameterized.Parameters(name = "Test snapshot: {0}")
+    public static Collection<SnapshotSpec> parameters() {
+        Collection<SnapshotSpec> parameters = new LinkedList<>();
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_5, FlinkVersion.v1_14)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_5, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_NATIVE,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_NATIVE,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.CHECKPOINT,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.CHECKPOINT,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+            parameters =
+                    parameters.stream()
+                            .filter(x -> x.getFlinkVersion().equals(currentVersion))
+                            .collect(Collectors.toList());
+        }
+        return parameters;
     }
 
-    private final FlinkVersion testMigrateVersion;
-    private final String testStateBackend;
+    private final SnapshotSpec snapshotSpec;
 
-    public StatefulJobWBroadcastStateMigrationITCase(
-            Tuple2<FlinkVersion, String> testMigrateVersionAndBackend) throws Exception {
-        this.testMigrateVersion = testMigrateVersionAndBackend.f0;
-        this.testStateBackend = testMigrateVersionAndBackend.f1;
+    public StatefulJobWBroadcastStateMigrationITCase(SnapshotSpec snapshotSpec) throws Exception {
+        this.snapshotSpec = snapshotSpec;
     }
 
     @Test
@@ -103,13 +128,16 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRestartStrategy(RestartStrategies.noRestart());
 
-        switch (testStateBackend) {
+        switch (snapshotSpec.getStateBackendType()) {
             case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
-                env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+                env.setStateBackend(new EmbeddedRocksDBStateBackend());
                 break;
             case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
                 env.setStateBackend(new MemoryStateBackend());
                 break;
+            case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME:
+                env.setStateBackend(new HashMapStateBackend());
+                break;
             default:
                 throw new UnsupportedOperationException();
         }
@@ -148,7 +176,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
         expectedThirdState.put(2L, "2");
         expectedThirdState.put(3L, "3");
 
-        if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
             nonParallelSource =
                     new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(
                             NUM_SOURCE_ELEMENTS);
@@ -163,8 +191,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
                             NUM_SOURCE_ELEMENTS);
             firstBroadcastFunction = new CheckpointingKeyedBroadcastFunction();
             secondBroadcastFunction = new CheckpointingKeyedSingleBroadcastFunction();
-        } else if (executionMode
-                == StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT) {
+        } else if (executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
             nonParallelSource =
                     new MigrationTestUtils.CheckingNonParallelSourceWithListState(
                             NUM_SOURCE_ELEMENTS);
@@ -252,19 +279,18 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
                 .uid("BrProcess2")
                 .addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
 
-        if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
-            executeAndSavepoint(
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+            executeAndSnapshot(
                     env,
-                    "src/test/resources/"
-                            + getBroadcastSavepointPath(testMigrateVersion, testStateBackend),
+                    "src/test/resources/" + getSnapshotPath(snapshotSpec),
+                    snapshotSpec.getSnapshotType(),
                     new Tuple2<>(
                             MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
                             2 * NUM_SOURCE_ELEMENTS));
         } else {
             restoreAndExecute(
                     env,
-                    getResourceFilename(
-                            getBroadcastSavepointPath(testMigrateVersion, testStateBackend)),
+                    getResourceFilename(getSnapshotPath(snapshotSpec)),
                     new Tuple2<>(
                             MigrationTestUtils.CheckingNonParallelSourceWithListState
                                     .SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
@@ -279,19 +305,8 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
         }
     }
 
-    private String getBroadcastSavepointPath(FlinkVersion savepointVersion, String backendType) {
-        switch (backendType) {
-            case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
-                return "new-stateful-broadcast-udf-migration-itcase-flink"
-                        + savepointVersion
-                        + "-rocksdb-savepoint";
-            case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
-                return "new-stateful-broadcast-udf-migration-itcase-flink"
-                        + savepointVersion
-                        + "-savepoint";
-            default:
-                throw new UnsupportedOperationException();
-        }
+    private String getSnapshotPath(SnapshotSpec snapshotSpec) {
+        return "new-stateful-broadcast-udf-migration-itcase-" + snapshotSpec;
     }
 
     /**
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java
index 85ff84c..9fd021e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java
@@ -110,7 +110,7 @@ public class MigrationTestUtils {
 
         private static final long serialVersionUID = 1L;
 
-        static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
+        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
                 CheckingNonParallelSourceWithListState.class + "_RESTORE_CHECK";
 
         private volatile boolean isRunning = true;
@@ -251,7 +251,7 @@ public class MigrationTestUtils {
 
         private static final long serialVersionUID = 1L;
 
-        static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
+        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
                 CheckingParallelSourceWithUnionListState.class + "_RESTORE_CHECK";
 
         private volatile boolean isRunning = true;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
similarity index 53%
rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
index f17f59c..57a4894 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.checkpointing.utils;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
@@ -33,6 +34,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.StateBackendLoader;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -46,9 +48,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.Serializable;
 import java.net.URI;
 import java.net.URL;
 import java.time.Duration;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -58,18 +64,154 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 /** Test savepoint migration. */
-public abstract class SavepointMigrationTestBase extends TestBaseUtils {
+
+/**
+ * Base for testing snapshot migration. The base test supports snapshots types as defined in {@link
+ * SnapshotType}.
+ */
+public abstract class SnapshotMigrationTestBase extends TestBaseUtils {
 
     @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
 
     @Rule public final MiniClusterWithClientResource miniClusterResource;
 
-    private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class);
+    private static final Logger LOG = LoggerFactory.getLogger(SnapshotMigrationTestBase.class);
 
     protected static final int DEFAULT_PARALLELISM = 4;
 
+    /**
+     * Modes for migration test execution. This enum is supposed to serve as a switch between two
+     * modes of test execution: 1) create snapshots and 2) verify snapshots:
+     */
+    public enum ExecutionMode {
+        /** Create binary snapshot(s), i.e. run the checkpointing functions. */
+        CREATE_SNAPSHOT,
+        /** Verify snapshot(s), i.e, restore snapshot and check execution result. */
+        VERIFY_SNAPSHOT
+    }
+
+    /** Types of snapshot supported by this base test. */
+    public enum SnapshotType {
+        /** Savepoints with Flink canonical format. */
+        SAVEPOINT_CANONICAL,
+        /** Savepoint with native format of respective state backend. */
+        SAVEPOINT_NATIVE,
+        /** Checkpoint. */
+        CHECKPOINT
+    }
+
+    /**
+     * A snapshot specification (immutable) for migration tests that consists of {@link
+     * FlinkVersion} that the snapshot has been created with, {@link
+     * SnapshotMigrationTestBase.SnapshotType}, and state backend type that the snapshot has been
+     * ctreated from.
+     */
+    public static class SnapshotSpec implements Serializable {
+        private final FlinkVersion flinkVersion;
+        private final String stateBackendType;
+        private final SnapshotMigrationTestBase.SnapshotType snapshotType;
+
+        /**
+         * Creates a {@link SnapshotSpec} with specified parameters.
+         *
+         * @param flinkVersion Specifies the {@link FlinkVersion}.
+         * @param stateBackendType Specifies the state backend type.
+         * @param snapshotType Specifies the {@link SnapshotMigrationTestBase.SnapshotType}.
+         */
+        public SnapshotSpec(
+                FlinkVersion flinkVersion,
+                String stateBackendType,
+                SnapshotMigrationTestBase.SnapshotType snapshotType) {
+            this.flinkVersion = flinkVersion;
+            this.stateBackendType = stateBackendType;
+            this.snapshotType = snapshotType;
+        }
+
+        /**
+         * Gets the {@link FlinkVersion} that the snapshot has been created with.
+         *
+         * @return {@link FlinkVersion}
+         */
+        public FlinkVersion getFlinkVersion() {
+            return flinkVersion;
+        }
+
+        /**
+         * Gets the state backend type that the snapshot has been created from.
+         *
+         * @return State backend type.
+         */
+        public String getStateBackendType() {
+            return stateBackendType;
+        }
+
+        /**
+         * Gets the {@link SnapshotMigrationTestBase.SnapshotType}.
+         *
+         * @return {@link SnapshotMigrationTestBase.SnapshotType}
+         */
+        public SnapshotMigrationTestBase.SnapshotType getSnapshotType() {
+            return snapshotType;
+        }
+
+        /**
+         * Creates a collection of {@link SnapshotSpec} for a given collection of {@link
+         * FlinkVersion} with the same parameters but different {@link FlinkVersion}.
+         *
+         * @param stateBackendType Specifies the state backend type.
+         * @param snapshotType Specifies the snapshot type.
+         * @param flinkVersions A collection of {@link FlinkVersion}.
+         * @return A collection of {@link SnapshotSpec} that differ only by means of {@link
+         *     FlinkVersion} FlinkVersion}.
+         */
+        public static Collection<SnapshotSpec> withVersions(
+                String stateBackendType,
+                SnapshotMigrationTestBase.SnapshotType snapshotType,
+                Collection<FlinkVersion> flinkVersions) {
+            List<SnapshotSpec> snapshotSpecCollection = new LinkedList<>();
+            for (FlinkVersion version : flinkVersions) {
+                snapshotSpecCollection.add(
+                        new SnapshotSpec(version, stateBackendType, snapshotType));
+            }
+            return snapshotSpecCollection;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder str = new StringBuilder("flink" + flinkVersion);
+            switch (stateBackendType) {
+                case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+                    str.append("-rocksdb");
+                    break;
+                case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
+                    // This is implicit due to backwards compatibility with legacy artifact names.
+                    break;
+                case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME:
+                    str.append("-hashmap");
+                    break;
+                default:
+                    throw new UnsupportedOperationException("State backend type not supported.");
+            }
+            switch (snapshotType) {
+                case SAVEPOINT_CANONICAL:
+                    str.append("-savepoint");
+                    // Canonical implicit due to backwards compatibility with legacy artifact names.
+                    break;
+                case SAVEPOINT_NATIVE:
+                    str.append("-savepoint-native");
+                    break;
+                case CHECKPOINT:
+                    str.append("-checkpoint");
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Snapshot type not supported.");
+            }
+            return str.toString();
+        }
+    }
+
     protected static String getResourceFilename(String filename) {
-        ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader();
+        ClassLoader cl = SnapshotMigrationTestBase.class.getClassLoader();
         URL resource = cl.getResource(filename);
         if (resource == null) {
             throw new NullPointerException("Missing snapshot resource.");
@@ -77,7 +219,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
         return resource.getFile();
     }
 
-    protected SavepointMigrationTestBase() throws Exception {
+    protected SnapshotMigrationTestBase() throws Exception {
         miniClusterResource =
                 new MiniClusterWithClientResource(
                         new MiniClusterResourceConfiguration.Builder()
@@ -115,10 +257,22 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
         return config;
     }
 
+    @Deprecated
     @SafeVarargs
     protected final void executeAndSavepoint(
             StreamExecutionEnvironment env,
-            String savepointPath,
+            String snapshotPath,
+            Tuple2<String, Integer>... expectedAccumulators)
+            throws Exception {
+        executeAndSnapshot(
+                env, snapshotPath, SnapshotType.SAVEPOINT_CANONICAL, expectedAccumulators);
+    }
+
+    @SafeVarargs
+    protected final void executeAndSnapshot(
+            StreamExecutionEnvironment env,
+            String snapshotPath,
+            SnapshotType snapshotType,
             Tuple2<String, Integer>... expectedAccumulators)
             throws Exception {
 
@@ -162,27 +316,41 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
             fail("Did not see the expected accumulator results within time limit.");
         }
 
-        LOG.info("Triggering savepoint.");
+        LOG.info("Triggering snapshot.");
 
-        CompletableFuture<String> savepointPathFuture =
-                client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL);
+        CompletableFuture<String> snapshotPathFuture;
+        switch (snapshotType) {
+            case SAVEPOINT_CANONICAL:
+                snapshotPathFuture =
+                        client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL);
+                break;
+            case SAVEPOINT_NATIVE:
+                snapshotPathFuture =
+                        client.triggerSavepoint(jobID, null, SavepointFormatType.NATIVE);
+                break;
+            case CHECKPOINT:
+                snapshotPathFuture = miniClusterResource.getMiniCluster().triggerCheckpoint(jobID);
+                break;
+            default:
+                throw new UnsupportedOperationException("Snapshot type not supported/implemented.");
+        }
 
-        String jobmanagerSavepointPath =
-                savepointPathFuture.get(deadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+        String jobmanagerSnapshotPath =
+                snapshotPathFuture.get(deadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-        File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath());
+        File jobManagerSnapshot = new File(new URI(jobmanagerSnapshotPath).getPath());
         // savepoints were changed to be directories in Flink 1.3
-        if (jobManagerSavepoint.isDirectory()) {
-            FileUtils.moveDirectory(jobManagerSavepoint, new File(savepointPath));
+        if (jobManagerSnapshot.isDirectory()) {
+            FileUtils.moveDirectory(jobManagerSnapshot, new File(snapshotPath));
         } else {
-            FileUtils.moveFile(jobManagerSavepoint, new File(savepointPath));
+            FileUtils.moveFile(jobManagerSnapshot, new File(snapshotPath));
         }
     }
 
     @SafeVarargs
     protected final void restoreAndExecute(
             StreamExecutionEnvironment env,
-            String savepointPath,
+            String snapshotPath,
             Tuple2<String, Integer>... expectedAccumulators)
             throws Exception {
 
@@ -193,7 +361,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
         // Submit the job
         JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(snapshotPath));
 
         JobID jobID = client.submitJob(jobGraph).get();
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
index aa22407..96d607b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
@@ -29,104 +29,117 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
-import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.LinkedList;
+import java.util.stream.Collectors;
 
 /**
  * Migration IT cases for upgrading a legacy {@link TypeSerializerConfigSnapshot} that is written in
  * checkpoints to {@link TypeSerializerSnapshot} interface.
  *
- * <p>The savepoints used by this test were written with a serializer snapshot class that extends
+ * <p>The snapshots used by this test were written with a serializer snapshot class that extends
  * {@link TypeSerializerConfigSnapshot}, as can be seen in the commented out code at the end of this
  * class. On restore, we change the snapshot to implement directly a {@link TypeSerializerSnapshot}.
  */
 @RunWith(Parameterized.class)
-public class TypeSerializerSnapshotMigrationITCase extends SavepointMigrationTestBase {
+public class TypeSerializerSnapshotMigrationITCase extends SnapshotMigrationTestBase {
 
     private static final int NUM_SOURCE_ELEMENTS = 4;
 
-    /**
-     * This test runs in either of two modes: 1) we want to generate the binary savepoint, i.e. we
-     * have to run the checkpointing functions 2) we want to verify restoring, so we have to run the
-     * checking functions.
-     */
-    public enum ExecutionMode {
-        PERFORM_SAVEPOINT,
-        VERIFY_SAVEPOINT
-    }
+    // TODO increase this to newer version to create and test snapshot migration for newer versions
+    private static final FlinkVersion currentVersion = FlinkVersion.v1_14;
 
-    // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
-    // TODO Note: You should generate the savepoint based on the release branch instead of the
+    // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+    // TODO Note: You should generate the snapshot based on the release branch instead of the
     // master.
-    private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT;
-
-    @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
-    public static Collection<Tuple2<FlinkVersion, String>> parameters() {
-        return Arrays.asList(
-                Tuple2.of(FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+    private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT;
+
+    @Parameterized.Parameters(name = "Test snapshot: {0}")
+    public static Collection<SnapshotSpec> parameters() {
+        Collection<SnapshotSpec> parameters = new LinkedList<>();
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_3, FlinkVersion.v1_14)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_3, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_NATIVE,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_NATIVE,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.CHECKPOINT,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.CHECKPOINT,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+            parameters =
+                    parameters.stream()
+                            .filter(x -> x.getFlinkVersion().equals(currentVersion))
+                            .collect(Collectors.toList());
+        }
+        return parameters;
     }
 
-    private final FlinkVersion testMigrateVersion;
-    private final String testStateBackend;
+    private final SnapshotSpec snapshotSpec;
 
-    public TypeSerializerSnapshotMigrationITCase(
-            Tuple2<FlinkVersion, String> testMigrateVersionAndBackend) throws Exception {
-        this.testMigrateVersion = testMigrateVersionAndBackend.f0;
-        this.testStateBackend = testMigrateVersionAndBackend.f1;
+    public TypeSerializerSnapshotMigrationITCase(SnapshotSpec snapshotSpec) throws Exception {
+        this.snapshotSpec = snapshotSpec;
     }
 
     @Test
-    public void testSavepoint() throws Exception {
+    public void testSnapshot() throws Exception {
         final int parallelism = 1;
 
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRestartStrategy(RestartStrategies.noRestart());
 
-        switch (testStateBackend) {
+        switch (snapshotSpec.getStateBackendType()) {
             case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
-                env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+                env.setStateBackend(new EmbeddedRocksDBStateBackend());
                 break;
             case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
                 env.setStateBackend(new MemoryStateBackend());
                 break;
+            case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME:
+                env.setStateBackend(new HashMapStateBackend());
+                break;
             default:
                 throw new UnsupportedOperationException();
         }
@@ -145,36 +158,30 @@ public class TypeSerializerSnapshotMigrationITCase extends SavepointMigrationTes
                 .map(new TestMapFunction())
                 .addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
 
-        if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) {
-            executeAndSavepoint(
+        final String snapshotPath = getSnapshotPath(snapshotSpec);
+
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+            executeAndSnapshot(
                     env,
-                    "src/test/resources/" + getSavepointPath(testMigrateVersion, testStateBackend),
-                    new Tuple2<>(
+                    "src/test/resources/" + snapshotPath,
+                    snapshotSpec.getSnapshotType(),
+                    Tuple2.of(
                             MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
                             NUM_SOURCE_ELEMENTS));
-        } else {
+        } else if (executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
             restoreAndExecute(
                     env,
-                    getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)),
-                    new Tuple2<>(
+                    getResourceFilename(snapshotPath),
+                    Tuple2.of(
                             MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
                             NUM_SOURCE_ELEMENTS));
+        } else {
+            throw new IllegalStateException("Unknown ExecutionMode " + executionMode);
         }
     }
 
-    private String getSavepointPath(FlinkVersion savepointVersion, String backendType) {
-        switch (backendType) {
-            case "rocksdb":
-                return "type-serializer-snapshot-migration-itcase-flink"
-                        + savepointVersion
-                        + "-rocksdb-savepoint";
-            case "jobmanager":
-                return "type-serializer-snapshot-migration-itcase-flink"
-                        + savepointVersion
-                        + "-savepoint";
-            default:
-                throw new UnsupportedOperationException();
-        }
+    private String getSnapshotPath(SnapshotSpec snapshotSpec) {
+        return "type-serializer-snapshot-migration-itcase-" + snapshotSpec;
     }
 
     private static class TestMapFunction
diff --git a/pom.xml b/pom.xml
index 14de924..d471a8e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1494,7 +1494,9 @@ under the License.
 						<exclude>**/src/test/resources/**/test-data</exclude>
 						<exclude>**/src/test/resources/*-snapshot</exclude>
 						<exclude>**/src/test/resources/*.snapshot</exclude>
-						<exclude>**/src/test/resources/*-savepoint</exclude>
+						<exclude>**/src/test/resources/*-savepoint/**</exclude>
+						<exclude>**/src/test/resources/*-savepoint-native/**</exclude>
+						<exclude>**/src/test/resources/*-checkpoint/**</exclude>
 						<exclude>flink-core/src/test/resources/serialized-kryo-serializer-1.3</exclude>
 						<exclude>flink-core/src/test/resources/type-without-avro-serialized-using-kryo</exclude>
 						<exclude>flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized</exclude>